Skip to content

Commit 17c9acf

Browse files
Merge pull request #9034 from rabbitmq/mqtt-nack
Nack rejected messages to MQTT 5.0 client
2 parents 79e3c10 + 2a4301e commit 17c9acf

File tree

8 files changed

+207
-107
lines changed

8 files changed

+207
-107
lines changed

deps/rabbit/src/rabbit_confirms.erl

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,3 @@ next_smallest(S, U) when is_map_key(S, U) ->
144144
next_smallest(S, U) ->
145145
%% TODO: this is potentially infinitely recursive if called incorrectly
146146
next_smallest(S+1, U).
147-
148-
149-
150-
-ifdef(TEST).
151-
-include_lib("eunit/include/eunit.hrl").
152-
-endif.

deps/rabbit/test/rabbit_confirms_SUITE.erl

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
-module(rabbit_confirms_SUITE).
22

3-
-compile(export_all).
4-
5-
-export([
6-
]).
7-
8-
-include_lib("common_test/include/ct.hrl").
3+
-compile([export_all, nowarn_export_all]).
94
-include_lib("eunit/include/eunit.hrl").
105

116
%%%===================================================================
@@ -17,40 +12,13 @@ all() ->
1712
{group, tests}
1813
].
1914

20-
21-
all_tests() ->
22-
[
23-
confirm,
24-
reject,
25-
remove_queue
26-
].
27-
2815
groups() ->
2916
[
30-
{tests, [], all_tests()}
31-
].
32-
33-
init_per_suite(Config) ->
34-
Config.
35-
36-
end_per_suite(_Config) ->
37-
ok.
38-
39-
init_per_group(_Group, Config) ->
40-
Config.
41-
42-
end_per_group(_Group, _Config) ->
43-
ok.
44-
45-
init_per_testcase(_TestCase, Config) ->
46-
Config.
47-
48-
end_per_testcase(_TestCase, _Config) ->
49-
ok.
50-
51-
%%%===================================================================
52-
%%% Test cases
53-
%%%===================================================================
17+
{tests, [shuffle],
18+
[confirm,
19+
reject,
20+
remove_queue
21+
]}].
5422

5523
confirm(_Config) ->
5624
XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
@@ -93,7 +61,6 @@ confirm(_Config) ->
9361
?assertEqual(0, rabbit_confirms:size(U7)),
9462
?assertEqual(undefined, rabbit_confirms:smallest(U7)),
9563

96-
9764
U8 = rabbit_confirms:insert(2, [QName], XName, U1),
9865
{[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
9966
ok.
@@ -126,7 +93,6 @@ reject(_Config) ->
12693
{error, not_found} = rabbit_confirms:reject(2, U5),
12794
?assertEqual(1, rabbit_confirms:size(U5)),
12895
?assertEqual(1, rabbit_confirms:smallest(U5)),
129-
13096
ok.
13197

13298
remove_queue(_Config) ->
@@ -147,8 +113,4 @@ remove_queue(_Config) ->
147113
U5 = rabbit_confirms:insert(1, [QName], XName, U0),
148114
U6 = rabbit_confirms:insert(2, [QName], XName, U5),
149115
{[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
150-
151116
ok.
152-
153-
154-
%% Utility

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,14 @@ rabbitmq_suite(
281281
],
282282
)
283283

284+
rabbitmq_suite(
285+
name = "rabbit_mqtt_confirms_SUITE",
286+
size = "small",
287+
deps = [
288+
"//deps/rabbit_common:erlang_app",
289+
],
290+
)
291+
284292
assert_suites()
285293

286294
alias(

deps/rabbitmq_mqtt/app.bzl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
331331
erlc_opts = "//:test_erlc_opts",
332332
deps = ["//deps/amqp_client:erlang_app"],
333333
)
334+
erlang_bytecode(
335+
name = "rabbit_mqtt_confirms_SUITE_beam_files",
336+
testonly = True,
337+
srcs = ["test/rabbit_mqtt_confirms_SUITE.erl"],
338+
outs = ["test/rabbit_mqtt_confirms_SUITE.beam"],
339+
app_name = "rabbitmq_mqtt",
340+
erlc_opts = "//:test_erlc_opts",
341+
)

deps/rabbitmq_mqtt/src/rabbit_mqtt_confirms.erl

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,21 @@ insert(PktId, QNames, State)
4949
-spec confirm([packet_id()], queue_name(), state()) ->
5050
{[packet_id()], state()}.
5151
confirm(PktIds, QName, State0) ->
52-
{L0, State} = lists:foldl(fun(PktId, Acc) ->
53-
confirm_one(PktId, QName, Acc)
54-
end, {[], State0}, PktIds),
55-
L = lists:reverse(L0),
56-
{L, State}.
52+
lists:foldl(fun(PktId, Acc) ->
53+
confirm_one(PktId, QName, Acc)
54+
end, {[], State0}, PktIds).
5755

58-
-spec reject(packet_id(), state()) ->
59-
{ok, state()} | {error, not_found}.
60-
reject(PktId, State0)
61-
when is_integer(PktId) ->
62-
case maps:take(PktId, State0) of
63-
{_, State} ->
64-
{ok, State};
65-
error ->
66-
{error, not_found}
67-
end.
56+
-spec reject([packet_id()], state()) ->
57+
{[packet_id()], state()}.
58+
reject(PktIds, State0) ->
59+
lists:foldl(fun(PktId, Acc = {Rejs, S0}) ->
60+
case maps:take(PktId, S0) of
61+
{_, S} ->
62+
{[PktId | Rejs], S};
63+
error ->
64+
Acc
65+
end
66+
end, {[], State0}, PktIds).
6867

6968
%% idempotent
7069
-spec remove_queue(queue_name(), state()) ->
@@ -77,7 +76,7 @@ remove_queue(QName, State) ->
7776
(_, _, PktIds) ->
7877
PktIds
7978
end, [], State),
80-
confirm(lists:sort(PktIds), QName, State).
79+
confirm(PktIds, QName, State).
8180

8281
%% INTERNAL
8382

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,17 +1686,15 @@ process_routing_confirm(#delivery{confirm = true,
16861686
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
16871687
State#state{unacked_client_pubs = U}.
16881688

1689-
-spec send_puback(list(packet_id()), state()) -> ok.
1690-
send_puback(PktIds0, State)
1689+
-spec send_puback(packet_id() | list(packet_id()), reason_code(), state()) -> ok.
1690+
send_puback(PktIds0, ReasonCode, State)
16911691
when is_list(PktIds0) ->
16921692
%% Classic queues confirm messages unordered.
16931693
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
16941694
PktIds = lists:usort(PktIds0),
16951695
lists:foreach(fun(Id) ->
1696-
send_puback(Id, ?RC_SUCCESS, State)
1697-
end, PktIds).
1698-
1699-
-spec send_puback(packet_id(), reason_code(), state()) -> ok.
1696+
send_puback(Id, ReasonCode, State)
1697+
end, PktIds);
17001698
send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
17011699
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
17021700
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK},
@@ -1971,7 +1969,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
19711969
QStates = rabbit_queue_type:remove(QRef, QStates1),
19721970
State = State0#state{queue_states = QStates,
19731971
unacked_client_pubs = U},
1974-
send_puback(ConfirmPktIds, State),
1972+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
19751973
{ok, State}
19761974
end.
19771975

@@ -2001,7 +1999,7 @@ handle_queue_event({queue_event, QName, Evt},
20011999
QStates = rabbit_queue_type:remove(QName, QStates0),
20022000
State = State1#state{queue_states = QStates,
20032001
unacked_client_pubs = U},
2004-
send_puback(ConfirmPktIds, State),
2002+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
20052003
{ok, State};
20062004
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
20072005
{error, Error, State0}
@@ -2013,19 +2011,21 @@ handle_queue_actions(Actions, #state{} = State0) ->
20132011
deliver_to_client(Msgs, Ack, S);
20142012
({settled, QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
20152013
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
2016-
send_puback(ConfirmPktIds, S),
2017-
S#state{unacked_client_pubs = U};
2018-
({rejected, _QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
2019-
%% Negative acks are supported in MQTT v5 only.
2020-
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
2021-
U = lists:foldl(
2022-
fun(PktId, Acc0) ->
2023-
case rabbit_mqtt_confirms:reject(PktId, Acc0) of
2024-
{ok, Acc} -> Acc;
2025-
{error, not_found} -> Acc0
2026-
end
2027-
end, U0, PktIds),
2014+
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
20282015
S#state{unacked_client_pubs = U};
2016+
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
2017+
cfg = #cfg{proto_ver = ProtoVer}}) ->
2018+
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
2019+
S = S0#state{unacked_client_pubs = U},
2020+
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
2021+
%% rejected messages since we can only (but must not) send a positive ack.
2022+
case ProtoVer of
2023+
?MQTT_PROTO_V5 ->
2024+
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
2025+
_ ->
2026+
ok
2027+
end,
2028+
S;
20292029
({block, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
20302030
S#state{queues_soft_limit_exceeded = sets:add_element(QName, QSLE)};
20312031
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-module(rabbit_mqtt_confirms_SUITE).
2+
3+
-compile([export_all, nowarn_export_all]).
4+
-include_lib("eunit/include/eunit.hrl").
5+
6+
%%%===================================================================
7+
%%% Common Test callbacks
8+
%%%===================================================================
9+
10+
all() ->
11+
[
12+
{group, tests}
13+
].
14+
15+
groups() ->
16+
[
17+
{tests, [shuffle],
18+
[confirm,
19+
reject,
20+
remove_queue
21+
]}].
22+
23+
-define(MOD, rabbit_mqtt_confirms).
24+
25+
confirm(_Config) ->
26+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
27+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
28+
U0 = ?MOD:init(),
29+
?assertEqual(0, ?MOD:size(U0)),
30+
31+
U1 = ?MOD:insert(1, [QName], U0),
32+
?assertEqual(1, ?MOD:size(U1)),
33+
?assert(?MOD:contains(1, U1)),
34+
35+
{[1], U2} = ?MOD:confirm([1], QName, U1),
36+
?assertEqual(0, ?MOD:size(U2)),
37+
?assertNot(?MOD:contains(1, U2)),
38+
39+
U3 = ?MOD:insert(2, [QName], U1),
40+
?assertEqual(2, ?MOD:size(U3)),
41+
?assert(?MOD:contains(1, U3)),
42+
?assert(?MOD:contains(2, U3)),
43+
44+
{[1], U4} = ?MOD:confirm([1], QName, U3),
45+
?assertEqual(1, ?MOD:size(U4)),
46+
?assertNot(?MOD:contains(1, U4)),
47+
?assert(?MOD:contains(2, U4)),
48+
49+
U5 = ?MOD:insert(2, [QName, QName2], U1),
50+
?assertEqual(2, ?MOD:size(U5)),
51+
?assert(?MOD:contains(1, U5)),
52+
?assert(?MOD:contains(2, U5)),
53+
54+
{[1], U6} = ?MOD:confirm([1, 2], QName, U5),
55+
{[2], U7} = ?MOD:confirm([2], QName2, U6),
56+
?assertEqual(0, ?MOD:size(U7)),
57+
58+
U8 = ?MOD:insert(2, [QName], U1),
59+
{Confirmed, _U9} = ?MOD:confirm([1, 2], QName, U8),
60+
?assertEqual([1, 2], lists:sort(Confirmed)),
61+
ok.
62+
63+
64+
reject(_Config) ->
65+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
66+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
67+
U0 = ?MOD:init(),
68+
?assertEqual(0, ?MOD:size(U0)),
69+
70+
U1 = ?MOD:insert(1, [QName], U0),
71+
?assert(?MOD:contains(1, U1)),
72+
73+
{[1], U2} = ?MOD:reject([1], U1),
74+
{[], U2} = ?MOD:reject([1], U2),
75+
?assertEqual(0, ?MOD:size(U2)),
76+
?assertNot(?MOD:contains(1, U2)),
77+
78+
U3 = ?MOD:insert(2, [QName, QName2], U1),
79+
80+
{[1], U4} = ?MOD:reject([1], U3),
81+
{[], U4} = ?MOD:reject([1], U4),
82+
?assertEqual(1, ?MOD:size(U4)),
83+
84+
{[2], U5} = ?MOD:reject([2], U3),
85+
{[], U5} = ?MOD:reject([2], U5),
86+
?assertEqual(1, ?MOD:size(U5)),
87+
?assert(?MOD:contains(1, U5)),
88+
ok.
89+
90+
remove_queue(_Config) ->
91+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
92+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
93+
U0 = ?MOD:init(),
94+
95+
U1 = ?MOD:insert(1, [QName, QName2], U0),
96+
U2 = ?MOD:insert(2, [QName2], U1),
97+
{[2], U3} = ?MOD:remove_queue(QName2, U2),
98+
?assertEqual(1, ?MOD:size(U3)),
99+
?assert(?MOD:contains(1, U3)),
100+
{[1], U4} = ?MOD:remove_queue(QName, U3),
101+
?assertEqual(0, ?MOD:size(U4)),
102+
?assertNot(?MOD:contains(1, U4)),
103+
104+
U5 = ?MOD:insert(1, [QName], U0),
105+
U6 = ?MOD:insert(2, [QName], U5),
106+
{Confirmed, U7} = ?MOD:remove_queue(QName, U6),
107+
?assertEqual([1, 2], lists:sort(Confirmed)),
108+
?assertEqual(0, ?MOD:size(U7)),
109+
ok.

0 commit comments

Comments
 (0)