Skip to content

Commit 4bd8c1f

Browse files
committed
First pass SAC consumer priority implementation.
Single active consumers will be activated if they have a higher priority than the currently active consumer. if the currently active consumer has pending messages, no further messages will be assigned to the consumer and the activation of the new consumer will happen once all pending messages are settled. This is to ensure processing order. Consumers with the same priority will internally be ordered to favour those with credit then those that attached first. QQ: add SAC consumer priority integration tests Dialyzer fix QQ: add check for ff in tests
1 parent bb67af8 commit 4bd8c1f

File tree

6 files changed

+813
-195
lines changed

6 files changed

+813
-195
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 162 additions & 85 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
-type consumer_meta() :: #{ack => boolean(),
8888
username => binary(),
8989
prefetch => non_neg_integer(),
90-
args => list()
90+
args => list(),
91+
priority => non_neg_integer()
9192
% %% set if and only if credit API v2 is in use
9293
% initial_delivery_count => rabbit_queue_type:delivery_count()
9394
}.
@@ -122,7 +123,7 @@
122123

123124
-record(consumer,
124125
{cfg = #consumer_cfg{},
125-
status = up :: up | suspected_down | cancelled | waiting,
126+
status = up :: up | suspected_down | cancelled | fading,
126127
next_msg_id = 0 :: msg_id(),
127128
checked_out = #{} :: #{msg_id() => msg()},
128129
%% max number of messages that can be sent

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,10 +856,16 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
856856
0
857857
end,
858858

859+
Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
860+
{_Key, Value} ->
861+
Value;
862+
_ -> 0
863+
end,
859864
ConsumerMeta = #{ack => AckRequired,
860865
prefetch => Prefetch,
861866
args => Args,
862-
username => ActingUser},
867+
username => ActingUser,
868+
priority => Priority},
863869
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
864870
Mode, ConsumerMeta,
865871
QState0),

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 197 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -46,68 +46,70 @@ groups() ->
4646
{unclustered, [], [
4747
{uncluster_size_2, [], [add_member]}
4848
]},
49-
{clustered, [], [
50-
{cluster_size_2, [], [add_member_2,
51-
add_member_not_running,
52-
add_member_classic,
53-
add_member_wrong_type,
54-
add_member_already_a_member,
55-
add_member_not_found,
56-
delete_member_not_running,
57-
delete_member_classic,
58-
delete_member_wrong_type,
59-
delete_member_queue_not_found,
60-
delete_member,
61-
delete_member_not_a_member,
62-
delete_member_member_already_deleted,
63-
node_removal_is_quorum_critical]
64-
++ memory_tests()},
65-
{cluster_size_3, [], [
66-
cleanup_data_dir,
67-
channel_handles_ra_event,
68-
declare_during_node_down,
69-
simple_confirm_availability_on_leader_change,
70-
publishing_to_unavailable_queue,
71-
confirm_availability_on_leader_change,
72-
recover_from_single_failure,
73-
recover_from_multiple_failures,
74-
leadership_takeover,
75-
delete_declare,
76-
delete_member_during_node_down,
77-
metrics_cleanup_on_leadership_takeover,
78-
metrics_cleanup_on_leader_crash,
79-
consume_in_minority,
80-
reject_after_leader_transfer,
81-
shrink_all,
82-
rebalance,
83-
file_handle_reservations,
84-
file_handle_reservations_above_limit,
85-
node_removal_is_not_quorum_critical,
86-
leader_locator_client_local,
87-
leader_locator_balanced,
88-
leader_locator_balanced_maintenance,
89-
leader_locator_balanced_random_maintenance,
90-
leader_locator_policy,
91-
status,
92-
format,
93-
add_member_2
94-
]
95-
++ all_tests()},
96-
{cluster_size_5, [], [start_queue,
97-
start_queue_concurrent,
98-
quorum_cluster_size_3,
99-
quorum_cluster_size_7,
100-
node_removal_is_not_quorum_critical,
101-
select_nodes_with_least_replicas,
102-
select_nodes_with_least_replicas_node_down
103-
]},
104-
{clustered_with_partitions, [],
105-
[
106-
reconnect_consumer_and_publish,
107-
reconnect_consumer_and_wait,
108-
reconnect_consumer_and_wait_channel_down
109-
]}
110-
]}
49+
{clustered, [],
50+
[
51+
{cluster_size_2, [], [add_member_2,
52+
add_member_not_running,
53+
add_member_classic,
54+
add_member_wrong_type,
55+
add_member_already_a_member,
56+
add_member_not_found,
57+
delete_member_not_running,
58+
delete_member_classic,
59+
delete_member_wrong_type,
60+
delete_member_queue_not_found,
61+
delete_member,
62+
delete_member_not_a_member,
63+
node_removal_is_quorum_critical]
64+
++ memory_tests()},
65+
{cluster_size_3, [], [
66+
cleanup_data_dir,
67+
channel_handles_ra_event,
68+
declare_during_node_down,
69+
simple_confirm_availability_on_leader_change,
70+
publishing_to_unavailable_queue,
71+
confirm_availability_on_leader_change,
72+
recover_from_single_failure,
73+
recover_from_multiple_failures,
74+
leadership_takeover,
75+
delete_declare,
76+
delete_member_during_node_down,
77+
metrics_cleanup_on_leadership_takeover,
78+
metrics_cleanup_on_leader_crash,
79+
consume_in_minority,
80+
reject_after_leader_transfer,
81+
shrink_all,
82+
rebalance,
83+
file_handle_reservations,
84+
file_handle_reservations_above_limit,
85+
node_removal_is_not_quorum_critical,
86+
leader_locator_client_local,
87+
leader_locator_balanced,
88+
leader_locator_balanced_maintenance,
89+
leader_locator_balanced_random_maintenance,
90+
leader_locator_policy,
91+
status,
92+
format,
93+
add_member_2,
94+
single_active_consumer_priority_take_over,
95+
single_active_consumer_priority
96+
]
97+
++ all_tests()},
98+
{cluster_size_5, [], [start_queue,
99+
start_queue_concurrent,
100+
quorum_cluster_size_3,
101+
quorum_cluster_size_7,
102+
node_removal_is_not_quorum_critical,
103+
select_nodes_with_least_replicas,
104+
select_nodes_with_least_replicas_node_down
105+
]},
106+
{clustered_with_partitions, [],
107+
[
108+
reconnect_consumer_and_publish,
109+
reconnect_consumer_and_wait,
110+
reconnect_consumer_and_wait_channel_down
111+
]}
112+
]}
111113
].
112114

113115
all_tests() ->
@@ -923,6 +925,7 @@ publish_confirm(Ch, QName, Timeout) ->
923925
ct:pal("NOT CONFIRMED! ~ts", [QName]),
924926
fail
925927
after Timeout ->
928+
flush(1),
926929
exit(confirm_timeout)
927930
end.
928931

@@ -971,6 +974,120 @@ consume_in_minority(Config) ->
971974
ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server2),
972975
ok.
973976

977+
single_active_consumer_priority_take_over(Config) ->
978+
check_quorum_queues_v4_compat(Config),
979+
980+
[Server0, Server1, _Server2] =
981+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
982+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
983+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
984+
QName = ?config(queue_name, Config),
985+
Q1 = <<QName/binary, "_1">>,
986+
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
987+
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
988+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
989+
{<<"x-single-active-consumer">>, bool, true}],
990+
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
991+
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]),
992+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
993+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
994+
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
995+
publish_confirm(Ch2, Q1),
996+
%% higher priority consumer attaches
997+
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]),
998+
999+
%% Q1 should still have Ch1 as consumer as it has pending messages
1000+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
1001+
rpc:call(Server0, ra, local_query,
1002+
[RaNameQ1, QueryFun])),
1003+
1004+
%% ack the message
1005+
receive
1006+
{#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>,
1007+
delivery_tag = DeliveryTag}, _} ->
1008+
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
1009+
multiple = false})
1010+
after 5000 ->
1011+
flush(1),
1012+
exit(basic_deliver_timeout)
1013+
end,
1014+
1015+
?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _},
1016+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]),
1017+
?DEFAULT_AWAIT),
1018+
ok.
1019+
1020+
single_active_consumer_priority(Config) ->
1021+
check_quorum_queues_v4_compat(Config),
1022+
[Server0, Server1, Server2] =
1023+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1024+
1025+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
1026+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1027+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server2),
1028+
QName = ?config(queue_name, Config),
1029+
Q1 = <<QName/binary, "_1">>,
1030+
Q2 = <<QName/binary, "_2">>,
1031+
Q3 = <<QName/binary, "_3">>,
1032+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
1033+
{<<"x-single-active-consumer">>, bool, true}],
1034+
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
1035+
?assertEqual({'queue.declare_ok', Q2, 0, 0}, declare(Ch2, Q2, Args)),
1036+
?assertEqual({'queue.declare_ok', Q3, 0, 0}, declare(Ch3, Q3, Args)),
1037+
1038+
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 3}]),
1039+
ok = subscribe(Ch1, Q2, false, <<"ch1-ctag2">>, [{"x-priority", byte, 2}]),
1040+
ok = subscribe(Ch1, Q3, false, <<"ch1-ctag3">>, [{"x-priority", byte, 1}]),
1041+
1042+
1043+
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 1}]),
1044+
ok = subscribe(Ch2, Q2, false, <<"ch2-ctag2">>, [{"x-priority", byte, 3}]),
1045+
ok = subscribe(Ch2, Q3, false, <<"ch2-ctag3">>, [{"x-priority", byte, 2}]),
1046+
1047+
ok = subscribe(Ch3, Q1, false, <<"ch3-ctag1">>, [{"x-priority", byte, 2}]),
1048+
ok = subscribe(Ch3, Q2, false, <<"ch3-ctag2">>, [{"x-priority", byte, 1}]),
1049+
ok = subscribe(Ch3, Q3, false, <<"ch3-ctag3">>, [{"x-priority", byte, 3}]),
1050+
1051+
1052+
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
1053+
RaNameQ2 = binary_to_atom(<<"%2F", "_", Q2/binary>>, utf8),
1054+
RaNameQ3 = binary_to_atom(<<"%2F", "_", Q3/binary>>, utf8),
1055+
%% assert each queue has a different consumer
1056+
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
1057+
1058+
%% Q1 should have the consumer on Ch1
1059+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
1060+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
1061+
1062+
%% Q2 Ch2
1063+
?assertMatch({ok, {_, {value, {<<"ch2-ctag2">>, _}}}, _},
1064+
rpc:call(Server1, ra, local_query, [RaNameQ2, QueryFun])),
1065+
1066+
%% Q3 Ch3
1067+
?assertMatch({ok, {_, {value, {<<"ch3-ctag3">>, _}}}, _},
1068+
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),
1069+
1070+
%% close Ch3
1071+
_ = rabbit_ct_client_helpers:close_channel(Ch3),
1072+
flush(100),
1073+
1074+
%% assert Q3 has Ch2 (priority 2) as consumer
1075+
?assertMatch({ok, {_, {value, {<<"ch2-ctag3">>, _}}}, _},
1076+
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),
1077+
1078+
%% close Ch2
1079+
_ = rabbit_ct_client_helpers:close_channel(Ch2),
1080+
flush(100),
1081+
1082+
%% assert all queues as has Ch1 as consumer
1083+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
1084+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
1085+
?assertMatch({ok, {_, {value, {<<"ch1-ctag2">>, _}}}, _},
1086+
rpc:call(Server0, ra, local_query, [RaNameQ2, QueryFun])),
1087+
?assertMatch({ok, {_, {value, {<<"ch1-ctag3">>, _}}}, _},
1088+
rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])),
1089+
ok.
1090+
9741091
reject_after_leader_transfer(Config) ->
9751092
[Server0, Server1, Server2] =
9761093
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -3649,13 +3766,20 @@ consume_empty(Ch, Queue, NoAck) ->
36493766
no_ack = NoAck})).
36503767

36513768
subscribe(Ch, Queue, NoAck) ->
3769+
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).
3770+
3771+
subscribe(Ch, Queue, NoAck, Tag, Args) ->
36523772
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
36533773
no_ack = NoAck,
3654-
consumer_tag = <<"ctag">>},
3774+
arguments = Args,
3775+
consumer_tag = Tag},
36553776
self()),
36563777
receive
3657-
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
3778+
#'basic.consume_ok'{consumer_tag = Tag} ->
36583779
ok
3780+
after 30000 ->
3781+
flush(100),
3782+
exit(subscribe_timeout)
36593783
end.
36603784

36613785
qos(Ch, Prefetch, Global) ->
@@ -3768,3 +3892,12 @@ basic_get(Ch, Q, NoAck, Attempt) ->
37683892
timer:sleep(100),
37693893
basic_get(Ch, Q, NoAck, Attempt - 1)
37703894
end.
3895+
3896+
check_quorum_queues_v4_compat(Config) ->
3897+
case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config,
3898+
quorum_queues_4) of
3899+
false ->
3900+
throw({skip, "test needs feature flag quorum_queues_v4"});
3901+
true ->
3902+
ok
3903+
end.

0 commit comments

Comments
 (0)