Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ header(first_acquirer = K,
header(delivery_count = K,
#amqp10_msg{header = #'v1_0.header'{delivery_count = D}}) ->
header_value(K, D);
header(K, #amqp10_msg{header = undefined}) -> header_value(K, undefined).
header(K, #amqp10_msg{header = undefined}) ->
header_value(K, undefined).

-spec delivery_annotations(amqp10_msg()) -> #{annotations_key() => any()}.
delivery_annotations(#amqp10_msg{delivery_annotations = undefined}) ->
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ rabbitmq_suite(
rabbitmq_suite(
name = "rabbit_fifo_int_SUITE",
size = "medium",
additional_beam = [
":test_test_util_beam",
],
deps = [
"//deps/rabbit_common:erlang_app",
"@aten//:erlang_app",
Expand All @@ -722,6 +725,7 @@ rabbitmq_suite(
],
deps = [
"//deps/rabbit_common:erlang_app",
"@meck//:erlang_app",
"@proper//:erlang_app",
"@ra//:erlang_app",
],
Expand All @@ -735,6 +739,15 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "rabbit_fifo_q_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
"@proper//:erlang_app",
],
)

rabbitmq_integration_suite(
name = "rabbit_fifo_dlx_integration_SUITE",
size = "medium",
Expand Down
20 changes: 19 additions & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -399,8 +401,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -541,6 +545,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx.hrl",
"src/rabbit_fifo_v0.hrl",
"src/rabbit_fifo_v1.hrl",
"src/rabbit_fifo_v3.hrl",
"src/rabbit_stream_coordinator.hrl",
"src/rabbit_stream_sac_coordinator.hrl",
],
Expand Down Expand Up @@ -672,8 +677,10 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -1288,7 +1295,8 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
testonly = True,
srcs = ["test/rabbit_fifo_SUITE.erl"],
outs = ["test/rabbit_fifo_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
hdrs = ["src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
Expand Down Expand Up @@ -2142,3 +2150,13 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_q_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_fifo_q_SUITE.erl"],
outs = ["test/rabbit_fifo_q_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["@proper//:erlang_app"],
)
4 changes: 2 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ record_death(Reason, SourceQueue,
routing_keys = RKeys,
count = 1,
anns = DeathAnns},
ReasonBin = atom_to_binary(Reason),
Anns = case Anns0 of
#{deaths := Deaths0} ->
Deaths = case Deaths0 of
Expand All @@ -406,7 +407,7 @@ record_death(Reason, SourceQueue,
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
Anns0#{<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths};
Expand All @@ -419,7 +420,6 @@ record_death(Reason, SourceQueue,
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
Expand Down
40 changes: 17 additions & 23 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,7 @@ get_property(priority, Msg) ->
-spec protocol_state(state(), mc:annotations()) -> iolist().
protocol_state(Msg0 = #msg_body_decoded{header = Header0,
message_annotations = MA0}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Msg = Msg0#msg_body_decoded{header = Header,
message_annotations = MA},
Expand All @@ -238,14 +231,7 @@ protocol_state(Msg0 = #msg_body_decoded{header = Header0,
protocol_state(#msg_body_encoded{header = Header0,
message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter];
Expand All @@ -269,10 +255,9 @@ protocol_state(#v1{message_annotations = MA0,
_ ->
undefined
end,
Header = #'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl,
first_acquirer = first_acquirer(Anns)},
Header = update_header_from_anns(#'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
Expand Down Expand Up @@ -573,13 +558,22 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).

-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
update_header_from_anns(undefined, Anns) ->
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
update_header_from_anns(Header, Anns) ->
DeliveryCount = case Anns of
#{delivery_count := C} -> C;
_ -> 0
end,
Redelivered = case Anns of
#{redelivered := R} -> R;
_ -> false
end,
not Redelivered.
FirstAcq = not Redelivered andalso
DeliveryCount =:= 0 andalso
not is_map_key(deaths, Anns),
Header#'v1_0.header'{first_acquirer = FirstAcq,
delivery_count = {uint, DeliveryCount}}.

encode_deaths(Deaths) ->
lists:map(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ convert_from(mc_amqp, Sections, Env) ->
{Headers2, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),

Headers = case Env of
#{message_containers_store_amqp_v1 := false} ->
#{'rabbitmq_4.0.0' := false} ->
Headers3 = case AProp of
undefined ->
Headers2;
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
Ex#resource.name;
get_annotation(id, #basic_message{id = Id}) ->
Id.
Id;
get_annotation(_Key, #basic_message{}) ->
undefined.

set_annotation(id, Value, #basic_message{} = Msg) ->
Msg#basic_message{id = Value};
Expand Down
64 changes: 40 additions & 24 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),

%% When feature flag credit_api_v2 becomes required,
%% When feature flag rabbitmq_4.0.0 becomes required,
%% the following 2 fields should be deleted.
credit_api_version :: 1 | 2,
%% When credit API v1 is used, our session process holds the delivery-count
Expand Down Expand Up @@ -225,7 +225,7 @@
frames :: [transfer_frame_body(), ...],
queue_ack_required :: boolean(),
%% Queue that sent us this message.
%% When feature flag credit_api_v2 becomes required, this field should be deleted.
%% When feature flag rabbitmq_4.0.0 becomes required, this field should be deleted.
queue_pid :: pid() | credit_api_v2,
delivery_id :: delivery_number(),
outgoing_unsettled :: #outgoing_unsettled{}
Expand Down Expand Up @@ -1068,17 +1068,17 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
QType = amqqueue:get_type(Q),
%% Whether credit API v1 or v2 is used is decided only here at link attachment time.
%% This decision applies to the whole life time of the link.
%% This means even when feature flag credit_api_v2 will be enabled later, this consumer will
%% This means even when feature flag rabbitmq_4.0.0 will be enabled later, this consumer will
%% continue to use credit API v1. This is the safest and easiest solution avoiding
%% transferring link flow control state (the delivery-count) at runtime from this session
%% process to the queue process.
%% Eventually, after feature flag credit_api_v2 gets enabled and a subsequent rolling upgrade,
%% Eventually, after feature flag rabbitmq_4.0.0 gets enabled and a subsequent rolling upgrade,
%% all consumers will use credit API v2.
%% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link
%% flow control state. Hence, credit API mixed version isn't an issue for streams.
{CreditApiVsn, Mode, DeliveryCount, ClientFlowCtl,
QueueFlowCtl, CreditReqInFlight, StashedCreditReq} =
case rabbit_feature_flags:is_enabled(credit_api_v2) orelse
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') orelse
QType =:= rabbit_stream_queue of
true ->
{2,
Expand Down Expand Up @@ -1861,20 +1861,30 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
discard;
settle_op_from_outcome(#'v1_0.released'{}) ->
requeue;
%% Keep the same Modified behaviour as in RabbitMQ 3.x
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
undeliverable_here = UndelHere})
when UndelHere =/= true ->
requeue;
settle_op_from_outcome(#'v1_0.modified'{}) ->
%% If delivery_failed is not true, we can't increment its delivery_count.
%% So, we will have to reject without requeue.
%%
%% If undeliverable_here is true, this is not quite correct because
%% undeliverable_here refers to the link, and not the message in general.
%% However, we cannot filter messages from being assigned to individual consumers.
%% That's why we will have to reject it without requeue.
discard;

%% Not all queue types support the modified outcome fields correctly.
%% However, we still allow the client to settle with the modified outcome
%% because some client libraries such as Apache QPid make use of it:
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
%% In such cases, it's better when RabbitMQ does not end the session.
%% See https://github.com/rabbitmq/rabbitmq-server/issues/6121
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
undeliverable_here = UndelHere,
message_annotations = Anns0
}) ->
Anns = case Anns0 of
#'v1_0.message_annotations'{content = C} ->
C;
_ ->
[]
end,
{modify,
default(DelFailed, false),
default(UndelHere, false),
%% TODO: this must exist elsewhere
lists:foldl(fun ({{symbol, K}, V}, Acc) ->
Acc#{K => unwrap(V)}
end, #{}, Anns)};
settle_op_from_outcome(Outcome) ->
protocol_error(
?V_1_0_AMQP_ERROR_INVALID_FIELD,
Expand Down Expand Up @@ -1981,7 +1991,7 @@ handle_queue_actions(Actions, State) ->
S0 = #state{outgoing_links = OutgoingLinks0,
outgoing_pending = Pending}) ->
%% credit API v1
%% Delete this branch when feature flag credit_api_v2 becomes required.
%% Delete this branch when feature flag rabbitmq_4.0.0 becomes required.
Handle = ctag_to_handle(Ctag),
Link = #outgoing_link{delivery_count = Count0} = maps:get(Handle, OutgoingLinks0),
{Count, Credit, S} = case Drain of
Expand Down Expand Up @@ -2788,7 +2798,7 @@ delivery_count_rcv(undefined) ->
%% credits to a queue has to synchronously wait for a credit reply from the queue:
%% https://github.com/rabbitmq/rabbitmq-server/blob/b9566f4d02f7ceddd2f267a92d46affd30fb16c8/deps/rabbitmq_codegen/credit_extension.json#L43
%% This blocks our entire AMQP 1.0 session process. Since the credit reply from the
%% queue did not contain the consumr tag prior to feature flag credit_api_v2, we
%% queue did not contain the consumr tag prior to feature flag rabbitmq_4.0.0, we
%% must behave here the same way as non-native AMQP 1.0: We wait until the queue
%% sends us a credit reply sucht that we can correlate that reply with our consumer tag.
process_credit_reply_sync(
Expand Down Expand Up @@ -2853,7 +2863,7 @@ process_credit_reply_sync_quorum_queue(Ctag, QName, Credit, State0) ->
no_return().
credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag credit_api_v2",
"Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
Expand Down Expand Up @@ -3441,12 +3451,13 @@ cap_credit(DesiredCredit) ->
min(DesiredCredit, MaxCredit).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
Feature = 'rabbitmq_4.0.0',
IsEnabled = rabbit_feature_flags:is_enabled(Feature),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
McEnv = #{Feature => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
Expand Down Expand Up @@ -3497,3 +3508,8 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

unwrap({_Tag, V}) ->
V;
unwrap(V) ->
V.
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ send_command_sync(Writer, ChannelNum, Performative) ->
Request = {send_command, ChannelNum, Performative},
gen_server:call(Writer, Request, ?CALL_TIMEOUT).

%% Delete this function when feature flag credit_api_v2 becomes required.
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
-spec send_command_and_notify(pid(),
pid(),
rabbit_types:channel_number(),
Expand Down Expand Up @@ -111,7 +111,7 @@ handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) ->
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
Expand All @@ -131,7 +131,7 @@ handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason},
credit_flow:peer_down(SessionPid),
State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)},
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
no_reply(State).
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ handle_cast({credit, SessionPid, CTag, Credit, Drain},
backing_queue = BQ,
backing_queue_state = BQS0} = State) ->
%% Credit API v1.
%% Delete this function clause when feature flag credit_api_v2 becomes required.
%% Delete this function clause when feature flag rabbitmq_4.0.0 becomes required.
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
rabbit_classic_queue:send_credit_reply_credit_api_v1(
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
Expand Down
Loading
Loading