Skip to content

Commit f3de471

Browse files
ansdkjnilsson
authored andcommitted
Change modified outcome behaviour
With the new quorum queue v4 improvements where a requeue counter was added in addition to the quorum queue delivery counter, the following sentence from #6292 (comment) doesn't apply anymore: > Also the case where delivery_failed=false|undefined requires the release of the > message without incrementing the delivery_count. Again this is not something > that our queues are able to do so again we have to reject without requeue. Therefore, we simplify the modified outcome behaviour: RabbitMQ will from now on only discard the message if the modified's undeliverable-here field is true.
1 parent 26c6332 commit f3de471

File tree

3 files changed

+67
-14
lines changed

3 files changed

+67
-14
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ header(first_acquirer = K,
193193
header(delivery_count = K,
194194
#amqp10_msg{header = #'v1_0.header'{delivery_count = D}}) ->
195195
header_value(K, D);
196-
header(K, #amqp10_msg{header = undefined}) -> header_value(K, undefined).
196+
header(K, #amqp10_msg{header = undefined}) ->
197+
header_value(K, undefined).
197198

198199
-spec delivery_annotations(amqp10_msg()) -> #{annotations_key() => any()}.
199200
delivery_annotations(#amqp10_msg{delivery_annotations = undefined}) ->

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1861,20 +1861,21 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
18611861
discard;
18621862
settle_op_from_outcome(#'v1_0.released'{}) ->
18631863
requeue;
1864-
%% Keep the same Modified behaviour as in RabbitMQ 3.x
1865-
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
1866-
undeliverable_here = UndelHere})
1867-
when UndelHere =/= true ->
1868-
requeue;
1869-
settle_op_from_outcome(#'v1_0.modified'{}) ->
1870-
%% If delivery_failed is not true, we can't increment its delivery_count.
1871-
%% So, we will have to reject without requeue.
1872-
%%
1873-
%% If undeliverable_here is true, this is not quite correct because
1874-
%% undeliverable_here refers to the link, and not the message in general.
1875-
%% However, we cannot filter messages from being assigned to individual consumers.
1876-
%% That's why we will have to reject it without requeue.
1864+
1865+
%% RabbitMQ does not support any of the modified outcome fields correctly.
1866+
%% However, we still allow the client to settle with the modified outcome
1867+
%% because some client libraries such as Apache QPid make use of it:
1868+
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
1869+
%% In such cases, it's better when RabbitMQ does not end the session.
1870+
%% See https://github.com/rabbitmq/rabbitmq-server/issues/6121
1871+
settle_op_from_outcome(#'v1_0.modified'{undeliverable_here = true}) ->
1872+
%% This is not quite correct because undeliverable_here refers to the link,
1873+
%% and not the message in general. However, RabbitMQ cannot filter messages from
1874+
%% being assigned to individual consumers. That's why we discard.
18771875
discard;
1876+
settle_op_from_outcome(#'v1_0.modified'{}) ->
1877+
requeue;
1878+
18781879
settle_op_from_outcome(Outcome) ->
18791880
protocol_error(
18801881
?V_1_0_AMQP_ERROR_INVALID_FIELD,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ groups() ->
4141
[
4242
reliable_send_receive_with_outcomes_classic_queue,
4343
reliable_send_receive_with_outcomes_quorum_queue,
44+
modified,
4445
sender_settle_mode_unsettled,
4546
sender_settle_mode_unsettled_fanout,
4647
sender_settle_mode_mixed,
@@ -402,6 +403,56 @@ reliable_send_receive(QType, Outcome, Config) ->
402403
ok = end_session_sync(Session2),
403404
ok = amqp10_client:close_connection(Connection2).
404405

406+
%% This test case doesn't expect the correct AMQP spec behavivour.
407+
%% We know that RabbitMQ doesn't implement the modified outcome correctly.
408+
%% Here, we test RabbitMQ's workaround behaviour:
409+
%% RabbitMQ discards if undeliverable-here is true. Otherwise, RabbitMQ requeues.
410+
modified(Config) ->
411+
QName = atom_to_binary(?FUNCTION_NAME),
412+
{Connection, Session, LinkPair} = init(Config),
413+
{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
414+
LinkPair, QName,
415+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
416+
Address = rabbitmq_amqp_address:queue(QName),
417+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
418+
ok = wait_for_credit(Sender),
419+
420+
Msg1 = amqp10_msg:new(<<"tag1">>, <<"m1">>, true),
421+
Msg2 = amqp10_msg:new(<<"tag2">>, <<"m2">>, true),
422+
ok = amqp10_client:send_msg(Sender, Msg1),
423+
ok = amqp10_client:send_msg(Sender, Msg2),
424+
ok = amqp10_client:detach_link(Sender),
425+
426+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
427+
428+
{ok, M1} = amqp10_client:get_msg(Receiver),
429+
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
430+
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, _UndeliverableHere = true, #{}}),
431+
432+
{ok, M2a} = amqp10_client:get_msg(Receiver),
433+
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
434+
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
435+
436+
{ok, M2b} = amqp10_client:get_msg(Receiver),
437+
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
438+
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
439+
440+
{ok, M2c} = amqp10_client:get_msg(Receiver),
441+
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
442+
ok = amqp10_client:settle_msg(Receiver, M2c, {modified, true, false, #{<<"key">> => <<"val">>}}),
443+
444+
{ok, M2d} = amqp10_client:get_msg(Receiver),
445+
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
446+
?assertEqual(0, amqp10_msg:header(delivery_count, M2d)),
447+
ok = amqp10_client:settle_msg(Receiver, M2d, modified),
448+
449+
ok = amqp10_client:detach_link(Receiver),
450+
?assertMatch({ok, #{message_count := 1}},
451+
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
452+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
453+
ok = end_session_sync(Session),
454+
ok = amqp10_client:close_connection(Connection).
455+
405456
%% Tests that confirmations are returned correctly
406457
%% when sending many messages async to a quorum queue.
407458
sender_settle_mode_unsettled(Config) ->

0 commit comments

Comments
 (0)