@@ -288,15 +288,24 @@ async def test_cluster_slot_ownership_changes(df_local_factory):
288
288
289
289
@dfly_args ({"proactor_threads" : 4 , "cluster_mode" : "yes" })
290
290
async def test_cluster_native_client (df_local_factory ):
291
- # Start and configure cluster with 3 nodes
292
- nodes = [
291
+ # Start and configure cluster with 3 masters and 3 replicas
292
+ masters = [
293
293
df_local_factory .create (port = BASE_PORT + i , admin_port = BASE_PORT + i + 1000 )
294
294
for i in range (3 )
295
295
]
296
- df_local_factory .start_all (nodes )
297
- c_nodes = [aioredis .Redis (port = node .port ) for node in nodes ]
298
- c_nodes_admin = [aioredis .Redis (port = node .admin_port ) for node in nodes ]
299
- node_ids = await asyncio .gather (* (get_node_id (c ) for c in c_nodes_admin ))
296
+ df_local_factory .start_all (masters )
297
+ c_masters = [aioredis .Redis (port = master .port ) for master in masters ]
298
+ c_masters_admin = [aioredis .Redis (port = master .admin_port ) for master in masters ]
299
+ master_ids = await asyncio .gather (* (get_node_id (c ) for c in c_masters_admin ))
300
+
301
+ replicas = [
302
+ df_local_factory .create (port = BASE_PORT + 100 + i , admin_port = BASE_PORT + i + 1100 )
303
+ for i in range (3 )
304
+ ]
305
+ df_local_factory .start_all (replicas )
306
+ c_replicas = [aioredis .Redis (port = replica .port ) for replica in replicas ]
307
+ c_replicas_admin = [aioredis .Redis (port = replica .admin_port ) for replica in replicas ]
308
+ replica_ids = await asyncio .gather (* (get_node_id (c ) for c in c_replicas_admin ))
300
309
301
310
config = f"""
302
311
[
@@ -308,11 +317,17 @@ async def test_cluster_native_client(df_local_factory):
308
317
}}
309
318
],
310
319
"master": {{
311
- "id": "{ node_ids [0 ]} ",
320
+ "id": "{ master_ids [0 ]} ",
312
321
"ip": "localhost",
313
- "port": { nodes [0 ].port }
322
+ "port": { masters [0 ].port }
314
323
}},
315
- "replicas": []
324
+ "replicas": [
325
+ {{
326
+ "id": "{ replica_ids [0 ]} ",
327
+ "ip": "localhost",
328
+ "port": { replicas [0 ].port }
329
+ }}
330
+ ]
316
331
}},
317
332
{{
318
333
"slot_ranges": [
@@ -322,11 +337,17 @@ async def test_cluster_native_client(df_local_factory):
322
337
}}
323
338
],
324
339
"master": {{
325
- "id": "{ node_ids [1 ]} ",
340
+ "id": "{ master_ids [1 ]} ",
326
341
"ip": "localhost",
327
- "port": { nodes [1 ].port }
342
+ "port": { masters [1 ].port }
328
343
}},
329
- "replicas": []
344
+ "replicas": [
345
+ {{
346
+ "id": "{ replica_ids [1 ]} ",
347
+ "ip": "localhost",
348
+ "port": { replicas [1 ].port }
349
+ }}
350
+ ]
330
351
}},
331
352
{{
332
353
"slot_ranges": [
@@ -336,23 +357,34 @@ async def test_cluster_native_client(df_local_factory):
336
357
}}
337
358
],
338
359
"master": {{
339
- "id": "{ node_ids [2 ]} ",
360
+ "id": "{ master_ids [2 ]} ",
340
361
"ip": "localhost",
341
- "port": { nodes [2 ].port }
362
+ "port": { masters [2 ].port }
342
363
}},
343
- "replicas": []
364
+ "replicas": [
365
+ {{
366
+ "id": "{ replica_ids [2 ]} ",
367
+ "ip": "localhost",
368
+ "port": { replicas [2 ].port }
369
+ }}
370
+ ]
344
371
}}
345
372
]
346
373
"""
347
- await push_config (config , c_nodes_admin )
374
+ await push_config (config , c_masters_admin + c_replicas_admin )
348
375
349
- client = aioredis .RedisCluster (decode_responses = True , host = "localhost" , port = nodes [0 ].port )
376
+ client = aioredis .RedisCluster (decode_responses = True , host = "localhost" , port = masters [0 ].port )
350
377
351
378
for i in range (10_000 ):
352
379
key = 'key' + str (i )
353
380
assert await client .set (key , 'value' ) == True
354
381
assert await client .get (key ) == 'value'
355
382
383
+ # Make sure that getting a value from a replica works as well.
384
+ replica_response = await client .execute_command (
385
+ 'get' , 'key0' , target_nodes = aioredis .RedisCluster .REPLICAS )
386
+ assert 'value' in replica_response .values ()
387
+
356
388
# Push new config
357
389
config = f"""
358
390
[
@@ -364,11 +396,17 @@ async def test_cluster_native_client(df_local_factory):
364
396
}}
365
397
],
366
398
"master": {{
367
- "id": "{ node_ids [0 ]} ",
399
+ "id": "{ master_ids [0 ]} ",
368
400
"ip": "localhost",
369
- "port": { nodes [0 ].port }
401
+ "port": { masters [0 ].port }
370
402
}},
371
- "replicas": []
403
+ "replicas": [
404
+ {{
405
+ "id": "{ replica_ids [0 ]} ",
406
+ "ip": "localhost",
407
+ "port": { replicas [0 ].port }
408
+ }}
409
+ ]
372
410
}},
373
411
{{
374
412
"slot_ranges": [
@@ -378,11 +416,17 @@ async def test_cluster_native_client(df_local_factory):
378
416
}}
379
417
],
380
418
"master": {{
381
- "id": "{ node_ids [1 ]} ",
419
+ "id": "{ master_ids [1 ]} ",
382
420
"ip": "localhost",
383
- "port": { nodes [1 ].port }
421
+ "port": { masters [1 ].port }
384
422
}},
385
- "replicas": []
423
+ "replicas": [
424
+ {{
425
+ "id": "{ replica_ids [1 ]} ",
426
+ "ip": "localhost",
427
+ "port": { replicas [1 ].port }
428
+ }}
429
+ ]
386
430
}},
387
431
{{
388
432
"slot_ranges": [
@@ -392,15 +436,21 @@ async def test_cluster_native_client(df_local_factory):
392
436
}}
393
437
],
394
438
"master": {{
395
- "id": "{ node_ids [2 ]} ",
439
+ "id": "{ master_ids [2 ]} ",
396
440
"ip": "localhost",
397
- "port": { nodes [2 ].port }
441
+ "port": { masters [2 ].port }
398
442
}},
399
- "replicas": []
443
+ "replicas": [
444
+ {{
445
+ "id": "{ replica_ids [2 ]} ",
446
+ "ip": "localhost",
447
+ "port": { replicas [2 ].port }
448
+ }}
449
+ ]
400
450
}}
401
451
]
402
452
"""
403
- await push_config (config , c_nodes_admin )
453
+ await push_config (config , c_masters_admin + c_replicas_admin )
404
454
405
455
for i in range (10_000 ):
406
456
key = 'key' + str (i )
0 commit comments