@@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta,
710
710
_ ->
711
711
return (Meta , State0 , stream_not_found , [])
712
712
end ;
713
- apply (#{machine_version := Vsn } = Meta ,
714
- {nodeup , Node } = Cmd ,
713
+ apply (Meta , {nodeup , Node } = Cmd ,
715
714
#? MODULE {monitors = Monitors0 ,
716
715
streams = Streams0 ,
717
716
single_active_consumer = Sac0 } = State ) ->
@@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta,
735
734
{Ss #{Id => S }, E }
736
735
end , {Streams0 , Effects0 }, Streams0 ),
737
736
738
- {Sac1 , Effects2 } = case ? V5_OR_MORE (Vsn ) of
739
- true ->
740
- SacMod = sac_module (Meta ),
741
- SacMod :handle_node_reconnected (Node ,
742
- Sac0 , Effects1 );
743
- false ->
744
- {Sac0 , Effects1 }
745
- end ,
737
+
738
+ {Sac1 , Effects2 } = sac_handle_node_reconnected (Meta , Node , Sac0 , Effects1 ),
746
739
return (Meta , State #? MODULE {monitors = Monitors ,
747
740
streams = Streams ,
748
741
single_active_consumer = Sac1 }, ok , Effects2 );
@@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
2444
2437
sac_handle_connection_down (SacState , Pid , _Reason , _Vsn ) ->
2445
2438
? SAC_V4 :handle_connection_down (Pid , SacState ).
2446
2439
2440
+ sac_handle_node_reconnected (#{machine_version := Vsn } = Meta , Node ,
2441
+ Sac , Effects ) ->
2442
+ case ? V5_OR_MORE (Vsn ) of
2443
+ true ->
2444
+ SacMod = sac_module (Meta ),
2445
+ SacMod :handle_node_reconnected (Node ,
2446
+ Sac , Effects );
2447
+ false ->
2448
+ {Sac , Effects }
2449
+ end .
2450
+
2447
2451
sac_make_purge_nodes (Nodes ) ->
2448
2452
rabbit_stream_sac_coordinator :make_purge_nodes (Nodes ).
2449
2453
0 commit comments