Skip to content

Commit 7fdbae5

Browse files
dcorbachogerhard
authored andcommitted
Use number of publishing channels as global publishers in amqp091
1 parent fe4d6d2 commit 7fdbae5

File tree

4 files changed

+28
-6
lines changed

4 files changed

+28
-6
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@
165165
delivery_flow,
166166
interceptor_state,
167167
queue_states,
168-
tick_timer
168+
tick_timer,
169+
publishing_mode = false :: boolean()
169170
}).
170171

171172
-define(QUEUE, lqueue).
@@ -878,6 +879,7 @@ terminate(_Reason,
878879
rabbit_event:if_enabled(State, #ch.stats_timer,
879880
fun() -> emit_stats(State) end),
880881
[delete_stats(Tag) || {Tag, _} <- get()],
882+
maybe_decrease_global_publishers(State),
881883
rabbit_core_metrics:channel_closed(self()),
882884
rabbit_event:notify(channel_closed, [{pid, self()},
883885
{user_who_performed_action, Username}]).
@@ -1285,6 +1287,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12851287
confirm_enabled = ConfirmEnabled,
12861288
delivery_flow = Flow
12871289
}) ->
1290+
State0 = maybe_increase_global_publishers(State),
12881291
rabbit_global_counters:messages_received(amqp091, 1),
12891292
check_msg_size(Content, MaxMessageSize, GCThreshold),
12901293
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -1302,10 +1305,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
13021305
DoConfirm = Tx =/= none orelse ConfirmEnabled,
13031306
{MsgSeqNo, State1} =
13041307
case DoConfirm orelse Mandatory of
1305-
false -> {undefined, State};
1308+
false -> {undefined, State0};
13061309
true -> rabbit_global_counters:messages_received_confirm(amqp091, 1),
1307-
SeqNo = State#ch.publish_seqno,
1308-
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
1310+
SeqNo = State0#ch.publish_seqno,
1311+
{SeqNo, State0#ch{publish_seqno = SeqNo + 1}}
13091312
end,
13101313
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
13111314
{ok, Message} ->
@@ -2917,3 +2920,14 @@ find_queue_name_from_quorum_name(Name, QStates) ->
29172920
end
29182921
end,
29192922
rabbit_queue_type:fold_state(Fun, undefined, QStates).
2923+
2924+
maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
2925+
State0;
2926+
maybe_increase_global_publishers(State0) ->
2927+
rabbit_global_counters:publisher_created(amqp091),
2928+
State0#ch{publishing_mode = true}.
2929+
2930+
maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
2931+
ok;
2932+
maybe_decrease_global_publishers(#ch{publishing_mode = false}) ->
2933+
rabbit_global_counters:publisher_deleted(amqp091).

deps/rabbit/test/queue_type_SUITE.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,18 @@ smoke(Config) ->
164164
basic_ack(Ch, basic_get(Ch, QName)),
165165
%% global counters
166166
publish_and_confirm(Ch, <<"inexistent_queue">>, <<"msg4">>),
167+
ConsumerTag3 = <<"ctag3">>,
168+
ok = subscribe(Ch, QName, ConsumerTag3),
167169
ProtocolCounters = maps:get([{protocol, amqp091}], get_global_counters(Config)),
168170
?assertEqual(#{
169171
messages_confirmed_total => 4,
170172
messages_received_confirm_total => 4,
171173
messages_received_total => 4,
172174
messages_routed_total => 3,
173175
messages_unroutable_dropped_total => 1,
174-
messages_unroutable_returned_total => 0
176+
messages_unroutable_returned_total => 0,
177+
consumers => 1,
178+
publishers => 1
175179
}, ProtocolCounters),
176180
QueueType = list_to_atom(
177181
"rabbit_" ++

deps/rabbitmq_prometheus/metrics.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ To generate these:
6464
| rabbitmq_global_messages_routed_total | Total number of messages routed to queues or streams |
6565
| rabbitmq_global_messages_unroutable_dropped_total | Total number of messages published as non-mandatory into an exchange and dropped as unroutable |
6666
| rabbitmq_global_messages_unroutable_returned_total | Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable |
67+
| rabbitmq_global_publishers | Publishers currently connected |
68+
| rabbitmq_global_consumers | Consumers currently connected |
6769

6870
### Generic
6971

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,9 @@ global_metrics_present_test(Config) ->
306306
?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_delivered_get_auto_ack_total{", [{capture, none}, multiline])),
307307
?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_get_empty_total{", [{capture, none}, multiline])),
308308
?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_redelivered_total{", [{capture, none}, multiline])),
309-
?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_acknowledged_total{", [{capture, none}, multiline])).
309+
?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_acknowledged_total{", [{capture, none}, multiline])),
310+
?assertEqual(match, re:run(Body, "^rabbitmq_global_publishers{", [{capture, none}, multiline])),
311+
?assertEqual(match, re:run(Body, "^rabbitmq_global_consumers{", [{capture, none}, multiline])).
310312

311313
global_metrics_single_metric_family_test(Config) ->
312314
{_Headers, Body} = http_get_with_pal(Config, [], 200),

0 commit comments

Comments
 (0)