Skip to content

Commit a9b420b

Browse files
ansdmergify[bot]
authored andcommitted
Allow MQTT QoS 0 subscribers to reconnect
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings. (cherry picked from commit 78b4fcc) # Conflicts: # deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
1 parent e7c98a8 commit a9b420b

File tree

3 files changed

+31
-18
lines changed

3 files changed

+31
-18
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1972,7 +1972,8 @@ on_node_down(Node) ->
19721972
{QueueNames, Deletions} ->
19731973
case length(QueueNames) of
19741974
0 -> ok;
1975-
_ -> rabbit_log:info("~tp transient queues from an old incarnation of node ~tp deleted in ~fs", [length(QueueNames), Node, Time/1000000])
1975+
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
1976+
[N, Node, Time / 1_000_000])
19761977
end,
19771978
notify_queue_binding_deletions(Deletions),
19781979
rabbit_core_metrics:queues_deleted(QueueNames),
@@ -1987,6 +1988,7 @@ filter_transient_queues_to_delete(Node) ->
19871988
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
19881989
andalso (not rabbit_amqqueue:is_replicated(Q)
19891990
orelse rabbit_amqqueue:is_dead_exclusive(Q))
1991+
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
19901992
end.
19911993

19921994
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ is_stateful() ->
6767
false.
6868

6969
-spec declare(amqqueue:amqqueue(), node()) ->
70-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
70+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
71+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
7172
declare(Q0, _Node) ->
7273
%% The queue gets persisted such that routing to this
7374
%% queue (via the topic exchange) works as usual.
@@ -84,6 +85,7 @@ declare(Q0, _Node) ->
8485
{arguments, amqqueue:get_arguments(Q0)},
8586
{user_who_performed_action, ActingUser}]),
8687
{new, Q};
88+
<<<<<<< HEAD
8789
{absent, OldQ, nodedown} ->
8890
%% This case body can be deleted once Mnesia is unsupported.
8991
OldPid = amqqueue:get_pid(OldQ),
@@ -98,6 +100,8 @@ declare(Q0, _Node) ->
98100
Other ->
99101
Other
100102
end;
103+
=======
104+
>>>>>>> 78b4fcc899 (Allow MQTT QoS 0 subscribers to reconnect)
101105
Other ->
102106
Other
103107
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,38 +1026,45 @@ rabbit_mqtt_qos0_queue(Config) ->
10261026
ok = emqtt:disconnect(Pub).
10271027

10281028
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1029-
Topic = atom_to_binary(?FUNCTION_NAME),
1029+
Topic1 = <<"t/1">>,
1030+
Topic2 = <<"t/2">>,
10301031
Pub = connect(<<"publisher">>, Config, 2, []),
10311032

10321033
SubscriberId = <<"subscriber">>,
10331034
Sub0 = connect(SubscriberId, Config, 0, []),
1034-
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1035-
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1036-
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1035+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
1036+
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
1037+
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
10371038

10381039
process_flag(trap_exit, true),
10391040
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1040-
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1041-
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1041+
ok = await_exit(Sub0),
1042+
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
10421043
timer:sleep(500),
1044+
%% Re-connect to a live node with same MQTT client ID.
10431045
Sub1 = connect(SubscriberId, Config, 1, []),
1044-
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1045-
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1046-
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1046+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
1047+
ok = emqtt:publish(Pub, Topic2, <<"m1">>, qos0),
1048+
ok = expect_publishes(Sub1, Topic2, [<<"m1">>]),
1049+
%% Since we started a new clean session, previous subscription should have been deleted.
1050+
ok = emqtt:publish(Pub, Topic1, <<"m2">>, qos0),
1051+
receive {publish, _} = Publish -> ct:fail({unexpected, Publish})
1052+
after 300 -> ok
1053+
end,
10471054

1048-
%% Start node 0 to have a majority for Khepri.
10491055
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
10501056
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1051-
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1052-
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1057+
%% This time, do not wait.
1058+
%% rabbit_amqqueue:on_node_down/1 may or may not have run.
10531059
Sub2 = connect(SubscriberId, Config, 2, []),
1054-
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1055-
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1056-
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1060+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic2, qos0),
1061+
ok = emqtt:publish(Pub, Topic2, <<"m3">>, qos0),
1062+
ok = expect_publishes(Sub2, Topic2, [<<"m3">>]),
10571063

10581064
ok = emqtt:disconnect(Sub2),
10591065
ok = emqtt:disconnect(Pub),
1060-
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1066+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
1067+
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
10611068

10621069
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
10631070
management_plugin_connection(Config) ->

0 commit comments

Comments
 (0)