Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2818,17 +2818,8 @@ get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
GCT
end.

get_consumer_timeout(PA = #pending_ack{tag = CTag},
State = #ch{consumer_mapping = CMap}) ->
case maps:find(CTag, CMap) of
{ok, {_, {_, _, _, Args}}} ->
case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
{long, Timeout} -> Timeout;
_ -> get_queue_consumer_timeout(PA, State)
end;
_ ->
get_queue_consumer_timeout(PA, State)
end.
get_consumer_timeout(PA, State) ->
get_queue_consumer_timeout(PA, State).

evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
case ?QUEUE:get(UAMQ, empty) of
Expand Down
44 changes: 14 additions & 30 deletions deps/rabbit/test/consumer_timeout_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,27 @@
-define(GROUP_CONFIG,
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, []}],
{queue_arguments, []}],
queue_policy_consumer_timeout => [{rabbit, []},
{queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
{queue_arguments, []},
{consumer_arguments, []}],
{queue_arguments, []}],
queue_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
{consumer_arguments, []}],
consumer_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).

-import(quorum_queue_utils, [wait_for_messages/2]).

all() ->
[
{group, global_consumer_timeout},
{group, queue_policy_consumer_timeout},
{group, queue_argument_consumer_timeout},
{group, consumer_argument_consumer_timeout}
{group, queue_argument_consumer_timeout}
].

groups() ->
ConsumerTests = [consumer_timeout,
consumer_timeout_no_basic_cancel_capability],
AllTests = ConsumerTests ++ [consumer_timeout_basic_get],

ConsumerTestsParallel = [
{classic_queue, [parallel], ConsumerTests},
{mirrored_queue, [parallel], ConsumerTests},
{quorum_queue, [parallel], ConsumerTests}
],
AllTests = [consumer_timeout,
consumer_timeout_no_basic_cancel_capability,
consumer_timeout_basic_get],

AllTestsParallel = [
{classic_queue, [parallel], AllTests},
Expand All @@ -63,8 +49,7 @@ groups() ->
[
{global_consumer_timeout, [], AllTestsParallel},
{queue_policy_consumer_timeout, [], AllTestsParallel},
{queue_argument_consumer_timeout, [], AllTestsParallel},
{consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
{queue_argument_consumer_timeout, [], AllTestsParallel}
].

suite() ->
Expand Down Expand Up @@ -158,7 +143,7 @@ consumer_timeout(Config) ->
declare_queue(Ch, Config, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
subscribe(Ch, QName, false),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
receive
Expand Down Expand Up @@ -226,7 +211,7 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
subscribe(Ch, QName, false),
receive
{#'basic.deliver'{delivery_tag = _,
redelivered = false}, _} ->
Expand Down Expand Up @@ -280,14 +265,13 @@ consume(Ch, QName, NoAck, Payloads) ->
DTag
end || Payload <- Payloads].

subscribe(Ch, Queue, NoAck, Args) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).
subscribe(Ch, Queue, NoAck) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>).

subscribe(Ch, Queue, NoAck, Ctag, Args) ->
subscribe(Ch, Queue, NoAck, Ctag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
consumer_tag = Ctag,
arguments = Args
consumer_tag = Ctag
},
self()),
receive
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_management/priv/www/js/global.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ var HELP = {
'queue-message-ttl':
'How long a message published to a queue can live before it is discarded (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#per-queue-message-ttl">x-message-ttl</a>" argument.)',

'queue-consumer-timeout':
'If a consumer does not ack its delivery for more than the <a href="https://www.rabbitmq.com/consumers.html#acknowledgement-timeout">timeout value</a> (30 minutes by default), its channel will be closed with a <code>PRECONDITION_FAILED</code> channel exception.',

'queue-expires':
'How long a queue can be unused for before it is automatically deleted (milliseconds).<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/ttl.html#queue-ttl">x-expires</a>" argument.)',

Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/consumers.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<th>Prefetch count</th>
<th>Active <span class="help" id="consumer-active"></span></th>
<th>Activity status</th>
<th>Consumer Timeout</th>
<th>Arguments</th>
</tr>
</thead>
Expand All @@ -34,6 +35,7 @@
<td class="c"><%= consumer.prefetch_count %></td>
<td class="c"><%= fmt_boolean(consumer.active) %></td>
<td class="c"><%= fmt_activity_status(consumer.activity_status) %></td>
<td class="c"><%= consumer.consumer_timeout %></td>
<td class="c"><%= fmt_table_short(consumer.arguments) %></td>
</tr>
<% } %>
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<span class="argument-link" field="definition" key="dead-letter-exchange" type="string">Dead letter exchange</span> |
<span class="argument-link" field="definition" key="dead-letter-routing-key" type="string">Dead letter routing key</span><br/>
<span class="argument-link" field="definition" key="message-ttl" type="number">Message TTL</span><span class="help" id="queue-message-ttl"></span></br>
<span class="argument-link" field="definition" key="consumer-timeout" type="number">Consumer Timeout</span><span class="help" id="queue-consumer-timeout"></span></br>
</td>
<tr>
<td>Queues [Classic]</td>
Expand Down
26 changes: 25 additions & 1 deletion deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,31 @@ augment_consumer({{Q, Ch, CTag}, Props}) ->
[{queue, format_resource(Q)},
{channel_details, augment_channel_pid(Ch)},
{channel_pid, Ch},
{consumer_tag, CTag} | Props].
{consumer_tag, CTag},
{consumer_timeout, consumer_timeout(Props, Q)} | Props].

consumer_timeout(_Props, Q) ->
get_queue_consumer_timeout(Q, get_global_consumer_timeout()).

get_queue_consumer_timeout(QName, GCT) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} -> %% should we account for different queue states here?
case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
fun (X, Y) -> erlang:min(X, Y) end, Q) of
undefined -> GCT;
Val -> Val
end;
_ ->
GCT
end.

get_global_consumer_timeout() ->
case application:get_env(rabbit, consumer_timeout) of
{ok, MS} when is_integer(MS) ->
MS;
_ ->
undefined
end.

consumers_by_vhost(VHost) ->
ets:select(consumer_stats,
Expand Down