@@ -1567,6 +1567,150 @@ async def test_cluster_replication_migration(
1567
1567
assert await seeder .compare (r1_capture , r2_node .instance .port )
1568
1568
1569
1569
1570
+ @dfly_args ({"proactor_threads" : 4 , "cluster_mode" : "yes" })
1571
+ async def test_start_replication_during_migration (
1572
+ df_factory : DflyInstanceFactory , df_seeder_factory : DflySeederFactory
1573
+ ):
1574
+ """
1575
+ Test replication with migration. Create the following setup:
1576
+
1577
+ master_1 do migration to master_2 and we start replication for master_1 during this migration
1578
+
1579
+ in the end master_1 and replica_1 should have the same data
1580
+ """
1581
+ instances = [
1582
+ df_factory .create (port = BASE_PORT + i , admin_port = BASE_PORT + 1000 + i ) for i in range (3 )
1583
+ ]
1584
+ df_factory .start_all (instances )
1585
+
1586
+ nodes = [await create_node_info (n ) for n in instances ]
1587
+ m1_node , r1_node , m2_node = nodes
1588
+ master_nodes = [m1_node , m2_node ]
1589
+
1590
+ m1_node .slots = [(0 , 16383 )]
1591
+ m1_node .replicas = [r1_node ]
1592
+ m2_node .slots = []
1593
+
1594
+ logging .debug ("Push initial config" )
1595
+ await push_config (
1596
+ json .dumps (generate_config (master_nodes )), [node .admin_client for node in nodes ]
1597
+ )
1598
+
1599
+ logging .debug ("create data" )
1600
+ seeder = df_seeder_factory .create (keys = 10000 , port = nodes [0 ].instance .port , cluster_mode = True )
1601
+ await seeder .run (target_deviation = 0.1 )
1602
+
1603
+ logging .debug ("start migration" )
1604
+ m1_node .migrations = [
1605
+ MigrationInfo ("127.0.0.1" , m2_node .instance .admin_port , [(2001 , 16383 )], m2_node .id )
1606
+ ]
1607
+ await push_config (
1608
+ json .dumps (generate_config (master_nodes )), [node .admin_client for node in nodes ]
1609
+ )
1610
+
1611
+ logging .debug ("start replication" )
1612
+ await r1_node .admin_client .execute_command (f"replicaof localhost { m1_node .instance .port } " )
1613
+
1614
+ await wait_available_async (r1_node .admin_client )
1615
+
1616
+ await wait_for_status (m1_node .admin_client , m2_node .id , "FINISHED" )
1617
+
1618
+ logging .debug ("finish migration" )
1619
+ m1_node .migrations = []
1620
+ m1_node .slots = [(0 , 2000 )]
1621
+ m2_node .migrations = []
1622
+ m2_node .slots = [(2001 , 16383 )]
1623
+
1624
+ await push_config (
1625
+ json .dumps (generate_config (master_nodes )), [node .admin_client for node in nodes ]
1626
+ )
1627
+
1628
+ await check_all_replicas_finished ([r1_node .client ], m1_node .client )
1629
+
1630
+ m1_capture = await seeder .capture (m1_node .instance .port )
1631
+
1632
+ assert await seeder .compare (m1_capture , r1_node .instance .port )
1633
+
1634
+
1635
+ @pytest .mark .parametrize ("migration_first" , [False , True ])
1636
+ @dfly_args ({"proactor_threads" : 4 , "cluster_mode" : "yes" , "dbfilename" : "snap_during_migration" })
1637
+ async def test_snapshoting_during_migration (
1638
+ df_factory : DflyInstanceFactory , df_seeder_factory : DflySeederFactory , migration_first : bool
1639
+ ):
1640
+ """
1641
+ Test saving snapshot during migration. Create the following setups:
1642
+
1643
+ 1) Start saving and then run migration simultaneously
1644
+ 2) Run migration and start saving simultaneously
1645
+
1646
+ The result should be the same: snapshot contains all the data that existed before migration
1647
+ """
1648
+ instances = [
1649
+ df_factory .create (port = BASE_PORT + i , admin_port = BASE_PORT + 1000 + i ) for i in range (2 )
1650
+ ]
1651
+ df_factory .start_all (instances )
1652
+
1653
+ nodes = [await create_node_info (n ) for n in instances ]
1654
+
1655
+ nodes [0 ].slots = [(0 , 16383 )]
1656
+ nodes [1 ].slots = []
1657
+
1658
+ logging .debug ("Push initial config" )
1659
+ await push_config (json .dumps (generate_config (nodes )), [node .admin_client for node in nodes ])
1660
+
1661
+ logging .debug ("create data" )
1662
+ seeder = df_seeder_factory .create (keys = 10000 , port = nodes [0 ].instance .port , cluster_mode = True )
1663
+ await seeder .run (target_deviation = 0.1 )
1664
+
1665
+ capture_before_migration = await seeder .capture (nodes [0 ].instance .port )
1666
+
1667
+ nodes [0 ].migrations = [
1668
+ MigrationInfo ("127.0.0.1" , nodes [1 ].instance .admin_port , [(0 , 16383 )], nodes [1 ].id )
1669
+ ]
1670
+
1671
+ async def start_migration ():
1672
+ logging .debug ("start migration" )
1673
+ await push_config (json .dumps (generate_config (nodes )), [node .admin_client for node in nodes ])
1674
+
1675
+ async def start_save ():
1676
+ logging .debug ("BGSAVE" )
1677
+ await nodes [0 ].client .execute_command (f"BGSAVE" )
1678
+
1679
+ if migration_first :
1680
+ await start_migration ()
1681
+ await asyncio .sleep (random .randint (0 , 10 ) / 100 )
1682
+ await start_save ()
1683
+ else :
1684
+ await start_save ()
1685
+ await asyncio .sleep (random .randint (0 , 10 ) / 100 )
1686
+ await start_migration ()
1687
+
1688
+ logging .debug ("wait for snapshot" )
1689
+ while await is_saving (nodes [0 ].client ):
1690
+ await asyncio .sleep (0.1 )
1691
+
1692
+ logging .debug ("wait migration finish" )
1693
+ await wait_for_status (nodes [0 ].admin_client , nodes [1 ].id , "FINISHED" )
1694
+
1695
+ logging .debug ("finish migration" )
1696
+ nodes [0 ].migrations = []
1697
+ nodes [0 ].slots = []
1698
+ nodes [0 ].migrations = []
1699
+ nodes [0 ].slots = [(0 , 16383 )]
1700
+
1701
+ await push_config (json .dumps (generate_config (nodes )), [node .admin_client for node in nodes ])
1702
+
1703
+ assert await seeder .compare (capture_before_migration , nodes [1 ].instance .port )
1704
+
1705
+ await nodes [1 ].client .execute_command (
1706
+ "DFLY" ,
1707
+ "LOAD" ,
1708
+ "snap_during_migration-summary.dfs" ,
1709
+ )
1710
+
1711
+ assert await seeder .compare (capture_before_migration , nodes [1 ].instance .port )
1712
+
1713
+
1570
1714
@dfly_args ({"proactor_threads" : 4 , "cluster_mode" : "yes" })
1571
1715
@pytest .mark .asyncio
1572
1716
async def test_cluster_migration_cancel (df_factory : DflyInstanceFactory ):
0 commit comments