Skip to content

Commit f54f880

Browse files
authored
Merge pull request #10034 from rabbitmq/gh10007
QQ: ensure members that are started late have right config.
2 parents da423a6 + 3eb982c commit f54f880

File tree

2 files changed

+42
-22
lines changed

2 files changed

+42
-22
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,7 @@ start_cluster(Q) ->
239239
[QuorumSize, rabbit_misc:rs(QName), Leader]),
240240
case rabbit_amqqueue:internal_declare(NewQ1, false) of
241241
{created, NewQ} ->
242-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
243-
?TICK_TIMEOUT),
244-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
245-
?SNAPSHOT_INTERVAL),
246-
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
242+
RaConfs = [make_ra_conf(NewQ, ServerId)
247243
|| ServerId <- members(NewQ)],
248244
try erpc_call(Leader, ra, start_cluster,
249245
[?RA_SYSTEM, RaConfs, ?START_CLUSTER_TIMEOUT],
@@ -625,11 +621,10 @@ recover(_Vhost, Queues) ->
625621
lists:foldl(
626622
fun (Q0, {R0, F0}) ->
627623
{Name, _} = amqqueue:get_pid(Q0),
624+
ServerId = {Name, node()},
628625
QName = amqqueue:get_name(Q0),
629-
Nodes = get_nodes(Q0),
630-
Formatter = {?MODULE, format_ra_event, [QName]},
631-
Res = case ra:restart_server(?RA_SYSTEM, {Name, node()},
632-
#{ra_event_formatter => Formatter}) of
626+
MutConf = make_mutable_config(Q0),
627+
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
633628
ok ->
634629
% queue was restarted, good
635630
ok;
@@ -641,10 +636,7 @@ recover(_Vhost, Queues) ->
641636
[rabbit_misc:rs(QName), Err1]),
642637
% queue was never started on this node
643638
% so needs to be started from scratch.
644-
Machine = ra_machine(Q0),
645-
RaNodes = [{Name, Node} || Node <- Nodes],
646-
case ra:start_server(?RA_SYSTEM, Name, {Name, node()},
647-
Machine, RaNodes) of
639+
case start_server(make_ra_conf(Q0, ServerId)) of
648640
ok -> ok;
649641
Err2 ->
650642
rabbit_log:warning("recover: quorum queue ~w could not"
@@ -1195,11 +1187,7 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
11951187
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
11961188
ServerId = {RaName, Node},
11971189
Members = members(Q),
1198-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1199-
?TICK_TIMEOUT),
1200-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1201-
?SNAPSHOT_INTERVAL),
1202-
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership),
1190+
Conf = make_ra_conf(Q, ServerId, Membership),
12031191
case ra:start_server(?RA_SYSTEM, Conf) of
12041192
ok ->
12051193
ServerIdSpec = maps:with([id, uid, membership], Conf),
@@ -1742,8 +1730,15 @@ members(Q) when ?amqqueue_is_quorum(Q) ->
17421730
format_ra_event(ServerId, Evt, QRef) ->
17431731
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
17441732

1745-
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
1746-
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter).
1733+
make_ra_conf(Q, ServerId) ->
1734+
make_ra_conf(Q, ServerId, voter).
1735+
1736+
make_ra_conf(Q, ServerId, Membership) ->
1737+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1738+
?TICK_TIMEOUT),
1739+
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1740+
?SNAPSHOT_INTERVAL),
1741+
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership).
17471742

17481743
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership) ->
17491744
QName = amqqueue:get_name(Q),
@@ -1765,6 +1760,16 @@ make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership) ->
17651760
machine => RaMachine,
17661761
ra_event_formatter => Formatter}).
17671762

1763+
make_mutable_config(Q) ->
1764+
QName = amqqueue:get_name(Q),
1765+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1766+
?TICK_TIMEOUT),
1767+
Formatter = {?MODULE, format_ra_event, [QName]},
1768+
#{tick_timeout => TickTimeout,
1769+
ra_event_formatter => Formatter}.
1770+
1771+
1772+
17681773
get_nodes(Q) when ?is_amqqueue(Q) ->
17691774
#{nodes := Nodes} = amqqueue:get_type_state(Q),
17701775
Nodes.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,7 @@ channel_handles_ra_event(Config) ->
16581658

16591659
declare_during_node_down(Config) ->
16601660
[Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
1661-
Config, nodename),
1661+
Config, nodename),
16621662

16631663
stop_node(Config, DownServer),
16641664
Running = Servers -- [DownServer],
@@ -1684,7 +1684,20 @@ declare_during_node_down(Config) ->
16841684

16851685
publish(Ch, QQ),
16861686
wait_for_messages_ready(Servers, RaName, 1),
1687-
ok.
1687+
1688+
case rabbit_ct_helpers:is_mixed_versions() of
1689+
true ->
1690+
%% stop here if mixexd
1691+
ok;
1692+
false ->
1693+
%% further assertions that we can consume from the newly
1694+
%% started member
1695+
SubCh = rabbit_ct_client_helpers:open_channel(Config, DownServer),
1696+
subscribe(SubCh, QQ, false),
1697+
receive_and_ack(Ch),
1698+
wait_for_messages_ready(Servers, RaName, 0),
1699+
ok
1700+
end.
16881701

16891702
simple_confirm_availability_on_leader_change(Config) ->
16901703
[Node1, Node2, _Node3] = Servers =
@@ -2881,6 +2894,8 @@ receive_and_ack(Ch) ->
28812894
redelivered = false}, _} ->
28822895
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
28832896
multiple = false})
2897+
after 5000 ->
2898+
ct:fail("receive_and_ack timed out", [])
28842899
end.
28852900

28862901
message_ttl_policy(Config) ->

0 commit comments

Comments
 (0)