Skip to content

Commit e99a9b4

Browse files
committed
chore: cluster migration cancellation on shutdown
1 parent 6b67f44 commit e99a9b4

File tree

6 files changed

+44
-2
lines changed

6 files changed

+44
-2
lines changed

src/server/cluster/cluster_config.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,13 @@ std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(
300300
return new_config;
301301
}
302302

303+
std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithoutMigrations() const {
304+
auto new_config = std::make_shared<ClusterConfig>(*this);
305+
new_config->my_incoming_migrations_.clear();
306+
new_config->my_outgoing_migrations_.clear();
307+
return new_config;
308+
}
309+
303310
bool ClusterConfig::IsMySlot(SlotId id) const {
304311
if (id > cluster::kMaxSlotNum) {
305312
DCHECK(false) << "Requesting a non-existing slot id " << id;

src/server/cluster/cluster_config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class ClusterConfig {
2626
std::shared_ptr<ClusterConfig> CloneWithChanges(const SlotRanges& enable_slots,
2727
const SlotRanges& disable_slots) const;
2828

29+
std::shared_ptr<ClusterConfig> CloneWithoutMigrations() const;
30+
2931
// If key is in my slots ownership return true
3032
bool IsMySlot(SlotId id) const;
3133
bool IsMySlot(std::string_view key) const;

src/server/cluster/cluster_family.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ ClusterConfig* ClusterFamily::cluster_config() {
7979
return tl_cluster_config.get();
8080
}
8181

82+
void ClusterFamily::Shutdown() {
83+
shard_set->pool()->at(0)->Await([this] {
84+
auto empty_config = tl_cluster_config->CloneWithoutMigrations();
85+
RemoveOutgoingMigrations(empty_config, tl_cluster_config);
86+
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
87+
88+
DCHECK(outgoing_migration_jobs_.empty());
89+
DCHECK(incoming_migrations_jobs_.empty());
90+
});
91+
}
92+
8293
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
8394
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
8495
.master = {},

src/server/cluster/cluster_family.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class ClusterFamily {
2828

2929
void Register(CommandRegistry* registry);
3030

31+
void Shutdown();
32+
3133
// Returns a thread-local pointer.
3234
static ClusterConfig* cluster_config();
3335

src/server/main_service.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,9 @@ void Service::Shutdown() {
910910

911911
config_registry.Reset();
912912

913-
// to shutdown all the runtime components that depend on EngineShard.
913+
// to shutdown all the runtime components that depend on EngineShard
914+
cluster_family_.Shutdown();
915+
914916
server_family_.Shutdown();
915917
StringFamily::Shutdown();
916918
GenericFamily::Shutdown();

tests/dragonfly/cluster_test.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1117,8 +1117,9 @@ async def test_cluster_flushall_during_migration(
11171117
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
11181118

11191119

1120+
@pytest.mark.parametrize("interrupt", [False, True])
11201121
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
1121-
async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
1122+
async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool):
11221123
# Check data migration from one node to another
11231124
instances = [
11241125
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
@@ -1145,6 +1146,23 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
11451146
logging.debug("Start migration")
11461147
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
11471148

1149+
if interrupt: # Test nodes properly shut down with pending migration
1150+
await asyncio.sleep(random.random())
1151+
1152+
# random instance
1153+
stop = random.getrandbits(1)
1154+
keep = 1 - stop
1155+
1156+
nodes[stop].instance.stop()
1157+
1158+
slots = await nodes[keep].admin_client.execute_command("CLUSTER SLOTS")
1159+
slots.sort(key=lambda cfg: cfg[0])
1160+
assert 0 in slots[0] and 9000 in slots[0]
1161+
assert 9001 in slots[1] and 16383 in slots[1]
1162+
1163+
await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes])
1164+
return
1165+
11481166
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
11491167

11501168
for i in range(20, 22):

0 commit comments

Comments
 (0)