Skip to content

Commit 5d563d0

Browse files
committed
Fix at-least-once dead lettering when the target include the source.
If the target for at least once dead lettering included the source queue the dead letter outbound queue in the quorum queue would never be cleared. This changes the queue -> dead letter worker message format to better distinguish between those and queue events for "normal" queue type interactions.
1 parent 3ce37e1 commit 5d563d0

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ delivery_effects(CPid, Msgs0) ->
236236
Msgs = lists:zipwith(fun (Cmd, {Reason, MsgId}) ->
237237
{MsgId, {Reason, rabbit_fifo:get_msg(Cmd)}}
238238
end, Log, RsnIds),
239-
[{send_msg, CPid, {dlx_delivery, Msgs}, [ra_event]}]
239+
[{send_msg, CPid, {dlx_event, self(), {dlx_delivery, Msgs}}, [cast]}]
240240
end}].
241241

242242
-spec state_enter(ra_server:ra_state() | eol, rabbit_types:r('queue'), dead_letter_handler(), state()) ->
@@ -308,7 +308,7 @@ update_config(at_least_once, at_least_once, _, State) ->
308308
{State, []};
309309
Pid ->
310310
%% Notify rabbit_fifo_dlx_worker about potentially updated policies.
311-
{State, [{send_msg, Pid, lookup_topology, ra_event}]}
311+
{State, [{send_msg, Pid, {dlx_event, self(), lookup_topology}, cast}]}
312312
end;
313313
update_config(SameDLH, SameDLH, _, State) ->
314314
{State, []};

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ process_command(Cmd, #state{leader = Leader} = State, Tries) ->
5757
process_command(Cmd, State, Tries - 1)
5858
end.
5959

60-
-spec handle_ra_event(ra:server_id(), term(), state()) ->
60+
-spec handle_ra_event(pid(), term(), state()) ->
6161
{ok, state(), actions()}.
62-
handle_ra_event(Leader, {machine, {dlx_delivery, _} = Del}, #state{leader = Leader} = State) ->
62+
handle_ra_event(Leader, {dlx_delivery, _} = Del,
63+
#state{leader = _Leader} = State) when node(Leader) == node() ->
6364
handle_delivery(Del, State);
6465
handle_ra_event(From, Evt, State) ->
6566
rabbit_log:debug("Ignoring ra event ~tp from ~tp", [Evt, From]),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,21 @@ handle_call(Request, From, State) ->
135135
rabbit_log:info("~ts received unhandled call from ~tp: ~tp", [?MODULE, From, Request]),
136136
{noreply, State}.
137137

138-
handle_cast({queue_event, QRef, {_From, {machine, lookup_topology}}},
139-
#state{queue_ref = QRef} = State0) ->
138+
handle_cast({dlx_event, _LeaderPid, {machine, lookup_topology}},
139+
#state{queue_ref = _} = State0) ->
140140
State = lookup_topology(State0),
141141
redeliver_and_ack(State);
142-
handle_cast({queue_event, QRef, {From, Evt}},
143-
#state{queue_ref = QRef,
142+
handle_cast({dlx_event, LeaderPid, Evt},
143+
#state{queue_ref = _QRef,
144144
dlx_client_state = DlxState0} = State0) ->
145145
%% received dead-letter message from source queue
146-
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(From, Evt, DlxState0),
146+
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0),
147147
State1 = State0#state{dlx_client_state = DlxState},
148148
State = handle_queue_actions(Actions, State1),
149149
{noreply, State};
150150
handle_cast({queue_event, QRef, Evt},
151151
#state{queue_type_state = QTypeState0} = State0) ->
152+
152153
case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of
153154
{ok, QTypeState1, Actions} ->
154155
%% received e.g. confirm from target queue

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
-module(dead_lettering_SUITE).
1010

1111
-include_lib("common_test/include/ct.hrl").
12-
-include_lib("kernel/include/file.hrl").
1312
-include_lib("amqp_client/include/amqp_client.hrl").
1413
-include_lib("eunit/include/eunit.hrl").
1514
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
@@ -1034,6 +1033,7 @@ dead_letter_headers_cycle(Config) ->
10341033
publish(Ch, QName, [P]),
10351034
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
10361035
[DTag] = consume(Ch, QName, [P]),
1036+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10371037
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
10381038
multiple = false,
10391039
requeue = false}),
@@ -1044,6 +1044,7 @@ dead_letter_headers_cycle(Config) ->
10441044
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
10451045
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
10461046

1047+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10471048
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
10481049
multiple = false,
10491050
requeue = false}),

0 commit comments

Comments
 (0)