Skip to content

Commit b567afd

Browse files
committed
chore: cancel slot migrations on shutdown
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 6b67f44 commit b567afd

File tree

8 files changed

+58
-16
lines changed

8 files changed

+58
-16
lines changed

src/server/cluster/cluster_family.cc

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,29 @@ ClusterConfig* ClusterFamily::cluster_config() {
7979
return tl_cluster_config.get();
8080
}
8181

82+
void ClusterFamily::Shutdown() {
83+
vector<shared_ptr<IncomingSlotMigration>> incoming;
84+
vector<shared_ptr<OutgoingMigration>> outgoing;
85+
{
86+
std::lock_guard lk(migration_mu_);
87+
incoming = std::move(incoming_migrations_jobs_);
88+
outgoing = std::move(outgoing_migration_jobs_);
89+
}
90+
91+
for (auto& migration : incoming)
92+
migration->Stop(true);
93+
for (auto& migration : outgoing)
94+
migration->Finish(true, true);
95+
96+
{
97+
std::lock_guard lk(migration_mu_);
98+
DCHECK(incoming_migrations_jobs_.empty()); // can't initiate migrations in shutdown state
99+
DCHECK(outgoing_migration_jobs_.empty());
100+
incoming_migrations_jobs_.clear();
101+
outgoing_migration_jobs_.clear();
102+
}
103+
}
104+
82105
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
83106
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
84107
.master = {},
@@ -784,7 +807,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
784807
SlotSet migration_slots(migration->GetSlots());
785808
SlotSet removed = migration_slots.GetRemovedSlots(tl_cluster_config->GetOwnedSlots());
786809

787-
migration->Stop();
810+
migration->Stop(false);
788811
// all fibers has migration shared_ptr so we don't need to join it and can erase
789812
jobs.erase(it);
790813

src/server/cluster/cluster_family.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class ClusterFamily {
2828

2929
void Register(CommandRegistry* registry);
3030

31+
void Shutdown();
32+
3133
// Returns a thread-local pointer.
3234
static ClusterConfig* cluster_config();
3335

src/server/cluster/incoming_slot_migration.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,20 @@ bool IncomingSlotMigration::Join(long attempt) {
165165
}
166166
}
167167

168-
void IncomingSlotMigration::Stop() {
168+
void IncomingSlotMigration::Stop(bool join) {
169169
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
170170
LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString();
171171
cntx_.Cancel();
172172

173173
for (auto& flow : shard_flows_) {
174-
if (auto err = flow->Cancel(); err) {
175-
VLOG(1) << "Error during flow Stop: " << err;
176-
}
174+
auto err = flow->Cancel();
175+
VLOG_IF(1, err) << "Error during flow Stop: " << err;
177176
}
178-
bc_->Cancel();
177+
178+
if (join)
179+
bc_->Wait();
180+
else
181+
bc_->Cancel();
179182
}
180183

181184
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {

src/server/cluster/incoming_slot_migration.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ class IncomingSlotMigration {
3232
// After Join we still can get data due to error situation
3333
[[nodiscard]] bool Join(long attempt);
3434

35-
// Stop migrations, can be called even after migration is finished
36-
void Stop();
35+
// Stop migrations, can be called even after migration is finished.
36+
// If join is set, wait for flows to finish
37+
void Stop(bool join);
3738

3839
MigrationState GetState() const {
3940
return state_.load();

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void OutgoingMigration::OnAllShards(
130130
});
131131
}
132132

133-
void OutgoingMigration::Finish(bool is_error) {
133+
void OutgoingMigration::Finish(bool is_error, bool force_finish) {
134134
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
135135
<< migration_info_.node_info.id;
136136
bool should_cancel_flows = false;
@@ -155,10 +155,12 @@ void OutgoingMigration::Finish(bool is_error) {
155155
state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
156156
}
157157

158-
if (should_cancel_flows) {
158+
if (force_finish || should_cancel_flows) {
159159
OnAllShards([](auto& migration) {
160-
CHECK(migration != nullptr);
161-
migration->Cancel();
160+
if (migration) {
161+
CHECK(migration != nullptr);
162+
migration->Cancel();
163+
}
162164
});
163165
}
164166
}

src/server/cluster/outgoing_slot_migration.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ class OutgoingMigration : private ProtocolClient {
3131
void Start();
3232

3333
// mark migration as FINISHED and cancel migration if it's not finished yet
34-
// can be called from any thread, but only after Start()
35-
void Finish(bool is_error = false);
34+
// can be called from any thread, but only after Start().
35+
// force_finish forecfully cancels shard flows
36+
void Finish(bool is_error = false, bool force_finish = false);
3637

3738
MigrationState GetState() const;
3839

src/server/main_service.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,9 @@ void Service::Shutdown() {
910910

911911
config_registry.Reset();
912912

913-
// to shutdown all the runtime components that depend on EngineShard.
913+
// to shutdown all the runtime components that depend on EngineShard
914+
cluster_family_.Shutdown();
915+
914916
server_family_.Shutdown();
915917
StringFamily::Shutdown();
916918
GenericFamily::Shutdown();

tests/dragonfly/cluster_test.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1117,8 +1117,9 @@ async def test_cluster_flushall_during_migration(
11171117
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
11181118

11191119

1120+
@pytest.mark.parametrize("interrupt", [False, True])
11201121
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
1121-
async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
1122+
async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool):
11221123
# Check data migration from one node to another
11231124
instances = [
11241125
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
@@ -1145,6 +1146,13 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
11451146
logging.debug("Start migration")
11461147
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
11471148

1149+
if interrupt: # Test nodes properly shut down with pending migration
1150+
await asyncio.sleep(random.random())
1151+
await close_clients(
1152+
*[node.client for node in nodes], *[node.admin_client for node in nodes]
1153+
)
1154+
return
1155+
11481156
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
11491157

11501158
for i in range(20, 22):

0 commit comments

Comments
 (0)