Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 56 additions & 47 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
BASE_PORT = 30001


def monotonically_increasing_port_number():
port = BASE_PORT
while True:
yield port
port = port + 1


# Create a generator object
next_port = monotonically_increasing_port_number()


class RedisClusterNode:
def __init__(self, port):
self.port = port
Expand Down Expand Up @@ -279,8 +290,8 @@ def is_local_host(ip: str) -> bool:
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replicas = [df_factory.create(port=next(next_port), logtostdout=True) for i in range(1, 3)]

df_factory.start_all([master, *replicas])

Expand Down Expand Up @@ -379,8 +390,8 @@ async def test_emulated_cluster_with_replicas(df_factory):

@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_managed_service_info(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))

df_factory.start_all([master, replica])

Expand Down Expand Up @@ -561,7 +572,7 @@ async def test_cluster_nodes(df_server, async_client):

@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
async def test_cluster_node_id(df_factory: DflyInstanceFactory):
node = df_factory.create(port=BASE_PORT)
node = df_factory.create(port=next(next_port))
df_factory.start_all([node])

conn = node.client()
Expand All @@ -571,9 +582,7 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes
nodes = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
nodes = [df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)]

df_factory.start_all(nodes)

Expand Down Expand Up @@ -640,7 +649,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[1].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30001"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[0].port}"

# And that node1 only has 1 key ("KEY2")
assert await c_nodes[1].execute_command("DBSIZE") == 1
Expand All @@ -664,7 +673,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[0].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30002"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[1].port}"

# And node1 should own it and allow using it
assert await c_nodes[1].set("KEY1", "value")
Expand Down Expand Up @@ -699,8 +708,8 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(admin_port=BASE_PORT + 1000)
replica = df_factory.create(admin_port=BASE_PORT + 1001)
master = df_factory.create(admin_port=next(next_port))
replica = df_factory.create(admin_port=next(next_port))
df_factory.start_all([master, replica])

async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
Expand Down Expand Up @@ -807,8 +816,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
df_factory.start_all([master, replica])

c_master = master.client()
Expand Down Expand Up @@ -958,7 +967,7 @@ async def test_cluster_blocking_command(df_server):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -987,11 +996,11 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory):

with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)
assert f"MOVED 3037 127.0.0.1:{instances[1].port}" == str(set_e_info.value)

with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
assert f"MOVED 7141 127.0.0.1:{instances[1].port}" == str(list_e_info.value)


@pytest.mark.parametrize("set_cluster_node_id", [True, False])
Expand All @@ -1004,8 +1013,8 @@ async def test_cluster_native_client(
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
)
for i in range(3)
Expand All @@ -1017,10 +1026,10 @@ async def test_cluster_native_client(

replicas = [
df_factory.create(
port=BASE_PORT + 100 + i,
admin_port=BASE_PORT + i + 1100,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
replicaof=f"localhost:{BASE_PORT + i}",
replicaof=f"localhost:{masters[i].port}",
)
for i in range(3)
]
Expand Down Expand Up @@ -1195,7 +1204,7 @@ async def test_random_keys():
async def test_config_consistency(df_factory: DflyInstanceFactory):
# Check slot migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1245,8 +1254,8 @@ async def test_cluster_flushall_during_migration(
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
logtostdout=True,
)
Expand Down Expand Up @@ -1298,8 +1307,8 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
Expand Down Expand Up @@ -1378,7 +1387,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
async def test_migration_with_key_ttl(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1427,7 +1436,7 @@ async def test_migration_with_key_ttl(df_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0})
async def test_network_disconnect_during_migration(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1496,8 +1505,8 @@ async def test_cluster_fuzzymigration(
):
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
serialization_max_chunk_size=huge_values,
replication_stream_output_limit=10,
Expand Down Expand Up @@ -1632,7 +1641,7 @@ async def test_all_finished():
async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1690,7 +1699,7 @@ async def test_cluster_replication_migration(
and make sure the captures on the replicas are equal.
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(4)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1767,7 +1776,7 @@ async def test_start_replication_during_migration(
in the end master_1 and replica_1 should have the same data
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(3)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(3)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1834,7 +1843,7 @@ async def test_snapshoting_during_migration(
The result should be the same: snapshot contains all the data that existed before migration
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1904,7 +1913,7 @@ async def start_save():
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1965,7 +1974,7 @@ async def node1size0():
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -2027,9 +2036,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]

# Start instances and connect clients
Expand Down Expand Up @@ -2114,9 +2123,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
Promote replica to master
Compare cluster data and replica data
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]

# Start instances and connect clients
Expand Down Expand Up @@ -2228,7 +2237,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")

# Start instances and connect clients
df_factory.start_all([replica])
Expand Down Expand Up @@ -2286,7 +2295,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
Send more traffic
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")

# Start instances and connect clients
df_factory.start_all([replica])
Expand Down Expand Up @@ -2371,8 +2380,8 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
instances = [
df_factory.create(
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="streamer=9",
)
for i in range(3)
Expand Down Expand Up @@ -2429,8 +2438,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
# Timeout set to 3 seconds because we must first saturate the socket before we get the timeout
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
replication_timeout=3000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2",
)
Expand Down
Loading