Skip to content

Commit f9f93b1

Browse files
authored
chore: monotonically increasing ports for cluster tests (#4268)
We have cascading failures in cluster tests because on assertion failures the nodes are not properly cleaned up and subsequent test cases that use the same ports fail. I added a monotonically increasing port generator to mitigate this effect.
1 parent 63ccbbc commit f9f93b1

File tree

1 file changed

+56
-47
lines changed

1 file changed

+56
-47
lines changed

tests/dragonfly/cluster_test.py

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@
2222
BASE_PORT = 30001
2323

2424

25+
def monotonically_increasing_port_number():
26+
port = BASE_PORT
27+
while True:
28+
yield port
29+
port = port + 1
30+
31+
32+
# Create a generator object
33+
next_port = monotonically_increasing_port_number()
34+
35+
2536
class RedisClusterNode:
2637
def __init__(self, port):
2738
self.port = port
@@ -279,8 +290,8 @@ def is_local_host(ip: str) -> bool:
279290
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
280291
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
281292
async def test_emulated_cluster_with_replicas(df_factory):
282-
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
283-
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
293+
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
294+
replicas = [df_factory.create(port=next(next_port), logtostdout=True) for i in range(1, 3)]
284295

285296
df_factory.start_all([master, *replicas])
286297

@@ -379,8 +390,8 @@ async def test_emulated_cluster_with_replicas(df_factory):
379390

380391
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
381392
async def test_cluster_managed_service_info(df_factory):
382-
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
383-
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)
393+
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
394+
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
384395

385396
df_factory.start_all([master, replica])
386397

@@ -561,7 +572,7 @@ async def test_cluster_nodes(df_server, async_client):
561572

562573
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
563574
async def test_cluster_node_id(df_factory: DflyInstanceFactory):
564-
node = df_factory.create(port=BASE_PORT)
575+
node = df_factory.create(port=next(next_port))
565576
df_factory.start_all([node])
566577

567578
conn = node.client()
@@ -571,9 +582,7 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory):
571582
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
572583
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
573584
# Start and configure cluster with 2 nodes
574-
nodes = [
575-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
576-
]
585+
nodes = [df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)]
577586

578587
df_factory.start_all(nodes)
579588

@@ -640,7 +649,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
640649
await c_nodes[1].set("KEY1", "value")
641650
assert False, "Should not be able to set key on non-owner cluster node"
642651
except redis.exceptions.ResponseError as e:
643-
assert e.args[0] == "MOVED 5259 localhost:30001"
652+
assert e.args[0] == f"MOVED 5259 localhost:{nodes[0].port}"
644653

645654
# And that node1 only has 1 key ("KEY2")
646655
assert await c_nodes[1].execute_command("DBSIZE") == 1
@@ -664,7 +673,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
664673
await c_nodes[0].set("KEY1", "value")
665674
assert False, "Should not be able to set key on non-owner cluster node"
666675
except redis.exceptions.ResponseError as e:
667-
assert e.args[0] == "MOVED 5259 localhost:30002"
676+
assert e.args[0] == f"MOVED 5259 localhost:{nodes[1].port}"
668677

669678
# And node1 should own it and allow using it
670679
assert await c_nodes[1].set("KEY1", "value")
@@ -699,8 +708,8 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
699708
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
700709
async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory):
701710
# Start and configure cluster with 1 master and 1 replica, both own all slots
702-
master = df_factory.create(admin_port=BASE_PORT + 1000)
703-
replica = df_factory.create(admin_port=BASE_PORT + 1001)
711+
master = df_factory.create(admin_port=next(next_port))
712+
replica = df_factory.create(admin_port=next(next_port))
704713
df_factory.start_all([master, replica])
705714

706715
async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
@@ -807,8 +816,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
807816
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
808817
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
809818
# Start and configure cluster with 1 master and 1 replica, both own all slots
810-
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
811-
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
819+
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
820+
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
812821
df_factory.start_all([master, replica])
813822

814823
c_master = master.client()
@@ -958,7 +967,7 @@ async def test_cluster_blocking_command(df_server):
958967
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
959968
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
960969
instances = [
961-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
970+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
962971
]
963972

964973
df_factory.start_all(instances)
@@ -987,11 +996,11 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
987996

988997
with pytest.raises(aioredis.ResponseError) as set_e_info:
989998
await set_task
990-
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)
999+
assert f"MOVED 3037 127.0.0.1:{instances[1].port}" == str(set_e_info.value)
9911000

9921001
with pytest.raises(aioredis.ResponseError) as list_e_info:
9931002
await list_task
994-
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
1003+
assert f"MOVED 7141 127.0.0.1:{instances[1].port}" == str(list_e_info.value)
9951004

9961005

9971006
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@@ -1004,8 +1013,8 @@ async def test_cluster_native_client(
10041013
# Start and configure cluster with 3 masters and 3 replicas
10051014
masters = [
10061015
df_factory.create(
1007-
port=BASE_PORT + i,
1008-
admin_port=BASE_PORT + i + 1000,
1016+
port=next(next_port),
1017+
admin_port=next(next_port),
10091018
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
10101019
)
10111020
for i in range(3)
@@ -1017,10 +1026,10 @@ async def test_cluster_native_client(
10171026

10181027
replicas = [
10191028
df_factory.create(
1020-
port=BASE_PORT + 100 + i,
1021-
admin_port=BASE_PORT + i + 1100,
1029+
port=next(next_port),
1030+
admin_port=next(next_port),
10221031
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
1023-
replicaof=f"localhost:{BASE_PORT + i}",
1032+
replicaof=f"localhost:{masters[i].port}",
10241033
)
10251034
for i in range(3)
10261035
]
@@ -1195,7 +1204,7 @@ async def test_random_keys():
11951204
async def test_config_consistency(df_factory: DflyInstanceFactory):
11961205
# Check slot migration from one node to another
11971206
instances = [
1198-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1207+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
11991208
]
12001209

12011210
df_factory.start_all(instances)
@@ -1245,8 +1254,8 @@ async def test_cluster_flushall_during_migration(
12451254
# Check data migration from one node to another
12461255
instances = [
12471256
df_factory.create(
1248-
port=BASE_PORT + i,
1249-
admin_port=BASE_PORT + i + 1000,
1257+
port=next(next_port),
1258+
admin_port=next(next_port),
12501259
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
12511260
logtostdout=True,
12521261
)
@@ -1298,8 +1307,8 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
12981307
# Check data migration from one node to another
12991308
instances = [
13001309
df_factory.create(
1301-
port=BASE_PORT + i,
1302-
admin_port=BASE_PORT + i + 1000,
1310+
port=next(next_port),
1311+
admin_port=next(next_port),
13031312
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
13041313
)
13051314
for i in range(2)
@@ -1378,7 +1387,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
13781387
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
13791388
async def test_migration_with_key_ttl(df_factory):
13801389
instances = [
1381-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1390+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
13821391
]
13831392

13841393
df_factory.start_all(instances)
@@ -1427,7 +1436,7 @@ async def test_migration_with_key_ttl(df_factory):
14271436
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0})
14281437
async def test_network_disconnect_during_migration(df_factory):
14291438
instances = [
1430-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1439+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
14311440
]
14321441

14331442
df_factory.start_all(instances)
@@ -1496,8 +1505,8 @@ async def test_cluster_fuzzymigration(
14961505
):
14971506
instances = [
14981507
df_factory.create(
1499-
port=BASE_PORT + i,
1500-
admin_port=BASE_PORT + i + 1000,
1508+
port=next(next_port),
1509+
admin_port=next(next_port),
15011510
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
15021511
serialization_max_chunk_size=huge_values,
15031512
replication_stream_output_limit=10,
@@ -1632,7 +1641,7 @@ async def test_all_finished():
16321641
async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
16331642
"""Check data migration from one node to another."""
16341643
instances = [
1635-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1644+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
16361645
]
16371646
df_factory.start_all(instances)
16381647

@@ -1690,7 +1699,7 @@ async def test_cluster_replication_migration(
16901699
and make sure the captures on the replicas are equal.
16911700
"""
16921701
instances = [
1693-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4)
1702+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(4)
16941703
]
16951704
df_factory.start_all(instances)
16961705

@@ -1767,7 +1776,7 @@ async def test_start_replication_during_migration(
17671776
in the end master_1 and replica_1 should have the same data
17681777
"""
17691778
instances = [
1770-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(3)
1779+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(3)
17711780
]
17721781
df_factory.start_all(instances)
17731782

@@ -1834,7 +1843,7 @@ async def test_snapshoting_during_migration(
18341843
The result should be the same: snapshot contains all the data that existed before migration
18351844
"""
18361845
instances = [
1837-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(2)
1846+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
18381847
]
18391848
df_factory.start_all(instances)
18401849

@@ -1904,7 +1913,7 @@ async def start_save():
19041913
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
19051914
"""Check data migration from one node to another."""
19061915
instances = [
1907-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1916+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
19081917
]
19091918
df_factory.start_all(instances)
19101919

@@ -1965,7 +1974,7 @@ async def node1size0():
19651974
@pytest.mark.asyncio
19661975
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
19671976
instances = [
1968-
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
1977+
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
19691978
]
19701979
df_factory.start_all(instances)
19711980

@@ -2027,9 +2036,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
20272036
Send traffic before replication start and while replicating.
20282037
Promote the replica to master and check data consistency between cluster and single node.
20292038
"""
2030-
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
2039+
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
20312040
cluster_nodes = [
2032-
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
2041+
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
20332042
]
20342043

20352044
# Start instances and connect clients
@@ -2114,9 +2123,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
21142123
Promote replica to master
21152124
Compare cluster data and replica data
21162125
"""
2117-
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
2126+
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
21182127
cluster_nodes = [
2119-
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
2128+
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
21202129
]
21212130

21222131
# Start instances and connect clients
@@ -2228,7 +2237,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
22282237
Send traffic before replication start and while replicating.
22292238
Promote the replica to master and check data consistency between cluster and single dragonfly node.
22302239
"""
2231-
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
2240+
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
22322241

22332242
# Start instances and connect clients
22342243
df_factory.start_all([replica])
@@ -2286,7 +2295,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
22862295
Send more traffic
22872296
Promote the replica to master and check data consistency between cluster and single dragonfly node.
22882297
"""
2289-
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
2298+
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
22902299

22912300
# Start instances and connect clients
22922301
df_factory.start_all([replica])
@@ -2371,8 +2380,8 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
23712380
instances = [
23722381
df_factory.create(
23732382
maxmemory="15G",
2374-
port=BASE_PORT + i,
2375-
admin_port=BASE_PORT + i + 1000,
2383+
port=next(next_port),
2384+
admin_port=next(next_port),
23762385
vmodule="streamer=9",
23772386
)
23782387
for i in range(3)
@@ -2429,8 +2438,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
24292438
# Timeout set to 3 seconds because we must first saturate the socket before we get the timeout
24302439
instances = [
24312440
df_factory.create(
2432-
port=BASE_PORT + i,
2433-
admin_port=BASE_PORT + i + 1000,
2441+
port=next(next_port),
2442+
admin_port=next(next_port),
24342443
replication_timeout=3000,
24352444
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2",
24362445
)

0 commit comments

Comments
 (0)