Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(
return new_config;
}

std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithoutMigrations() const {
auto new_config = std::make_shared<ClusterConfig>(*this);
new_config->my_incoming_migrations_.clear();
new_config->my_outgoing_migrations_.clear();
return new_config;
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id > cluster::kMaxSlotNum) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class ClusterConfig {
std::shared_ptr<ClusterConfig> CloneWithChanges(const SlotRanges& enable_slots,
const SlotRanges& disable_slots) const;

std::shared_ptr<ClusterConfig> CloneWithoutMigrations() const;

// If key is in my slots ownership return true
bool IsMySlot(SlotId id) const;
bool IsMySlot(std::string_view key) const;
Expand Down
15 changes: 15 additions & 0 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@ ClusterConfig* ClusterFamily::cluster_config() {
return tl_cluster_config.get();
}

void ClusterFamily::Shutdown() {
shard_set->pool()->at(0)->Await([this] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why at 0?

Copy link
Contributor Author

@dranikpg dranikpg Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't like this line either, but the way we initialize shards, 0..n are always shards, n..m can be connection handler threads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we run this command on the shards? I thought just run it in the same thread

lock_guard lk(set_config_mu);
if (!tl_cluster_config)
return;

auto empty_config = tl_cluster_config->CloneWithoutMigrations();
RemoveOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));

DCHECK(outgoing_migration_jobs_.empty());
DCHECK(incoming_migrations_jobs_.empty());
});
}

ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
.master = {},
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class ClusterFamily {

void Register(CommandRegistry* registry);

void Shutdown();

// Returns a thread-local pointer.
static ClusterConfig* cluster_config();

Expand Down
4 changes: 3 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,9 @@ void Service::Shutdown() {

config_registry.Reset();

// to shutdown all the runtime components that depend on EngineShard.
// to shutdown all the runtime components that depend on EngineShard
cluster_family_.Shutdown();

server_family_.Shutdown();
StringFamily::Shutdown();
GenericFamily::Shutdown();
Expand Down
20 changes: 19 additions & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,9 @@ async def test_cluster_flushall_during_migration(
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])


@pytest.mark.parametrize("interrupt", [False, True])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool):
# Check data migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
Expand All @@ -1145,6 +1146,23 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
logging.debug("Start migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

if interrupt: # Test nodes properly shut down with pending migration
await asyncio.sleep(random.random())

# random instance
stop = random.getrandbits(1)
keep = 1 - stop

nodes[stop].instance.stop()

slots = await nodes[keep].admin_client.execute_command("CLUSTER SLOTS")
slots.sort(key=lambda cfg: cfg[0])
assert 0 in slots[0] and 9000 in slots[0]
assert 9001 in slots[1] and 16383 in slots[1]

await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes])
return

await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")

for i in range(20, 22):
Expand Down
Loading