Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,176 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
assert False, "Should not be able to get key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert re.match(r"MOVED \d+ localhost:1111", e.args[0])


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(df_local_factory):
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_local_factory.create(port=BASE_PORT+i, admin_port=BASE_PORT+i+1000)
for i in range(3)
]
df_local_factory.start_all(masters)
c_masters = [aioredis.Redis(port=master.port) for master in masters]
c_masters_admin = [aioredis.Redis(port=master.admin_port) for master in masters]
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))

replicas = [
df_local_factory.create(port=BASE_PORT+100+i, admin_port=BASE_PORT+i+1100)
for i in range(3)
]
df_local_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
c_replicas_admin = [aioredis.Redis(port=replica.admin_port) for replica in replicas]
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin))

config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": 5000
}}
],
"master": {{
"id": "{master_ids[0]}",
"ip": "localhost",
"port": {masters[0].port}
}},
"replicas": [
{{
"id": "{replica_ids[0]}",
"ip": "localhost",
"port": {replicas[0].port}
}}
]
}},
{{
"slot_ranges": [
{{
"start": 5001,
"end": 10000
}}
],
"master": {{
"id": "{master_ids[1]}",
"ip": "localhost",
"port": {masters[1].port}
}},
"replicas": [
{{
"id": "{replica_ids[1]}",
"ip": "localhost",
"port": {replicas[1].port}
}}
]
}},
{{
"slot_ranges": [
{{
"start": 10001,
"end": 16383
}}
],
"master": {{
"id": "{master_ids[2]}",
"ip": "localhost",
"port": {masters[2].port}
}},
"replicas": [
{{
"id": "{replica_ids[2]}",
"ip": "localhost",
"port": {replicas[2].port}
}}
]
}}
]
"""
await push_config(config, c_masters_admin + c_replicas_admin)

client = aioredis.RedisCluster(decode_responses=True, host="localhost", port=masters[0].port)

for i in range(10_000):
key = 'key' + str(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here random key.
Maybe create nested function , you have the same loop below

assert await client.set(key, 'value') == True
assert await client.get(key) == 'value'

# Make sure that getting a value from a replica works as well.
replica_response = await client.execute_command(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you dont wait here for the replica to finish replicating all the changes

'get', 'key0', target_nodes=aioredis.RedisCluster.REPLICAS)
assert 'value' in replica_response.values()

# Push new config
config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": 4000
}}
],
"master": {{
"id": "{master_ids[0]}",
"ip": "localhost",
"port": {masters[0].port}
}},
"replicas": [
{{
"id": "{replica_ids[0]}",
"ip": "localhost",
"port": {replicas[0].port}
}}
]
}},
{{
"slot_ranges": [
{{
"start": 4001,
"end": 14000
}}
],
"master": {{
"id": "{master_ids[1]}",
"ip": "localhost",
"port": {masters[1].port}
}},
"replicas": [
{{
"id": "{replica_ids[1]}",
"ip": "localhost",
"port": {replicas[1].port}
}}
]
}},
{{
"slot_ranges": [
{{
"start": 14001,
"end": 16383
}}
],
"master": {{
"id": "{master_ids[2]}",
"ip": "localhost",
"port": {masters[2].port}
}},
"replicas": [
{{
"id": "{replica_ids[2]}",
"ip": "localhost",
"port": {replicas[2].port}
}}
]
}}
]
"""
await push_config(config, c_masters_admin + c_replicas_admin)

for i in range(10_000):
key = 'key' + str(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if you want to set only 100 keys you should generate a random key so that we test all slots when this test runs few times

assert await client.set(key, 'value') == True
assert await client.get(key) == 'value'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the recent timeout in CI that we had. In the future:

  • such parts can be split in multiple tasks that run concurrently, all on its range or modulo
  • possibly ranges that surely belong to a single node can be handled in pipelines?

When more tests are added, it can add up to hundreds of thousands or requests (its already 40k for this test). Github runners in debug mode have a very low throughput.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats also under question if this load is needed. Replication tests trigger all kinds of edge cases with different values and higher load (replicate during update, etc...). Here it seems like it doesn't really matter and you verify only correctness. So you could for example check 10% random values of the range: if you have a bug it's likely to be found in one or a few passes, otherwise why check the whole range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not realize we're so constraint!
Thanks for the comment, I tuned down the test.

await client.close()