Skip to content

Commit 1e3f4e5

Browse files
ansdgomoripeti
authored andcommitted
Emit histogram metric for received message sizes per protocol (#12342)
* Add global histogram metrics for received message sizes per-protocol fixup: add new files to bazel fixup: expose message_size_bytes as prometheus classic histogram type `rabbit_msg_size_metrics` does not use `seshat` any more, but `counters` directly. fixup: add msg_size_metrics unit test * Improve message size histogram 1. Avoid unnecessary time series emitted for stream protocol The stream protocol cannot observe message sizes. This commit ensures that the following time series are omitted: ``` rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0 rabbitmq_global_message_size_bytes_count{protocol="stream"} 0 rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0 ``` This reduces the number of time series by 15. 2. Further reduce the number of time series by reducing the number of buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not free, each is an extra time series stored. Prior to this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 92 ``` After this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 57 ``` 3. The emitted metric should be called `rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`. The latter is poor naming. There is no need to use `global` in the metric name given that this metric doesn't exist in the old flawed aggregated metrics. 4. This commit simplies module `rabbit_global_counters`. 5. Avoid garbage collecting the 10-elements list of buckets per message being received. --------- Co-authored-by: Péter Gömöri <[email protected]>
1 parent 8377eda commit 1e3f4e5

17 files changed

+533
-34
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,13 @@ rabbitmq_integration_suite(
478478
],
479479
)
480480

481+
rabbitmq_integration_suite(
482+
name = "msg_size_metrics_SUITE",
483+
runtime_deps = [
484+
"//deps/rabbitmq_amqp_client:erlang_app",
485+
],
486+
)
487+
481488
rabbitmq_integration_suite(
482489
name = "list_consumers_sanity_check_SUITE",
483490
size = "medium",
@@ -993,6 +1000,11 @@ rabbitmq_integration_suite(
9931000
size = "medium",
9941001
)
9951002

1003+
rabbitmq_suite(
1004+
name = "unit_msg_size_metrics_SUITE",
1005+
size = "small",
1006+
)
1007+
9961008
rabbitmq_suite(
9971009
name = "unit_operator_policy_SUITE",
9981010
size = "small",

deps/rabbit/app.bzl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def all_beam_files(name = "all_beam_files"):
169169
"src/rabbit_metrics.erl",
170170
"src/rabbit_mirror_queue_misc.erl",
171171
"src/rabbit_mnesia.erl",
172+
"src/rabbit_msg_size_metrics.erl",
172173
"src/rabbit_msg_store.erl",
173174
"src/rabbit_msg_store_gc.erl",
174175
"src/rabbit_networking.erl",
@@ -425,6 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
425426
"src/rabbit_metrics.erl",
426427
"src/rabbit_mirror_queue_misc.erl",
427428
"src/rabbit_mnesia.erl",
429+
"src/rabbit_msg_size_metrics.erl",
428430
"src/rabbit_msg_store.erl",
429431
"src/rabbit_msg_store_gc.erl",
430432
"src/rabbit_networking.erl",
@@ -703,6 +705,7 @@ def all_srcs(name = "all_srcs"):
703705
"src/rabbit_metrics.erl",
704706
"src/rabbit_mirror_queue_misc.erl",
705707
"src/rabbit_mnesia.erl",
708+
"src/rabbit_msg_size_metrics.erl",
706709
"src/rabbit_msg_store.erl",
707710
"src/rabbit_msg_store_gc.erl",
708711
"src/rabbit_networking.erl",
@@ -1714,6 +1717,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17141717
erlc_opts = "//:test_erlc_opts",
17151718
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
17161719
)
1720+
erlang_bytecode(
1721+
name = "unit_msg_size_metrics_SUITE_beam_files",
1722+
testonly = True,
1723+
srcs = ["test/unit_msg_size_metrics_SUITE.erl"],
1724+
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
1725+
app_name = "rabbit",
1726+
erlc_opts = "//:test_erlc_opts",
1727+
)
17171728
erlang_bytecode(
17181729
name = "unit_operator_policy_SUITE_beam_files",
17191730
testonly = True,
@@ -2183,3 +2194,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21832194
app_name = "rabbit",
21842195
erlc_opts = "//:test_erlc_opts",
21852196
)
2197+
erlang_bytecode(
2198+
name = "msg_size_metrics_SUITE_beam_files",
2199+
testonly = True,
2200+
srcs = ["test/msg_size_metrics_SUITE.erl"],
2201+
outs = ["test/msg_size_metrics_SUITE.beam"],
2202+
app_name = "rabbit",
2203+
erlc_opts = "//:test_erlc_opts",
2204+
deps = ["//deps/amqp_client:erlang_app"],
2205+
)

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2336,7 +2336,9 @@ incoming_link_transfer(
23362336
{MsgBin0, FirstDeliveryId, FirstSettled}
23372337
end,
23382338
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
2339-
validate_message_size(PayloadBin, MaxMessageSize),
2339+
PayloadSize = iolist_size(PayloadBin),
2340+
validate_message_size(PayloadSize, MaxMessageSize),
2341+
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
23402342

23412343
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23422344
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
@@ -3066,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) ->
30663068

30673069
validate_message_size(_, unlimited) ->
30683070
ok;
3069-
validate_message_size(Message, MaxMsgSize)
3070-
when is_integer(MaxMsgSize) ->
3071-
MsgSize = iolist_size(Message),
3071+
validate_message_size(MsgSize, MaxMsgSize)
3072+
when is_integer(MsgSize) ->
30723073
case MsgSize =< MaxMsgSize of
30733074
true ->
30743075
ok;
@@ -3082,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize)
30823083
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
30833084
"message size (~b bytes) > maximum message size (~b bytes)",
30843085
[MsgSize, MaxMsgSize])
3085-
end.
3086+
end;
3087+
validate_message_size(Msg, MaxMsgSize) ->
3088+
validate_message_size(iolist_size(Msg), MaxMsgSize).
30863089

30873090
-spec ensure_terminus(source | target,
30883091
term(),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ check_msg_size(Content, GCThreshold) ->
985985
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
986986
case Size =< MaxMessageSize of
987987
true ->
988-
ok;
988+
rabbit_msg_size_metrics:observe(amqp091, Size);
989989
false ->
990990
Fmt = case MaxMessageSize of
991991
?MAX_MSG_SIZE ->

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
boot_step/0,
1414
init/1,
1515
init/2,
16-
overview/0,
1716
prometheus_format/0,
1817
increase_protocol_counter/3,
1918
messages_received/2,
@@ -38,6 +37,10 @@
3837
messages_dead_lettered_confirmed/3
3938
]).
4039

40+
-ifdef(TEST).
41+
-export([overview/0]).
42+
-endif.
43+
4144
%% PROTOCOL COUNTERS:
4245
-define(MESSAGES_RECEIVED, 1).
4346
-define(MESSAGES_RECEIVED_CONFIRM, 2).
@@ -132,12 +135,14 @@
132135
boot_step() ->
133136
[begin
134137
%% Protocol counters
135-
init([{protocol, Proto}]),
138+
Protocol = {protocol, Proto},
139+
init([Protocol]),
140+
rabbit_msg_size_metrics:init(Proto),
136141

137142
%% Protocol & Queue Type counters
138-
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
139-
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
140-
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
143+
init([Protocol, {queue_type, rabbit_classic_queue}]),
144+
init([Protocol, {queue_type, rabbit_quorum_queue}]),
145+
init([Protocol, {queue_type, rabbit_stream_queue}])
141146
end || Proto <- [amqp091, amqp10]],
142147

143148
%% Dead Letter counters
@@ -192,8 +197,10 @@ init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetter
192197
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
193198
persistent_term:put({?MODULE, QueueType, DLS}, Counters).
194199

200+
-ifdef(TEST).
195201
overview() ->
196202
seshat:overview(?MODULE).
203+
-endif.
197204

198205
prometheus_format() ->
199206
seshat:format(?MODULE).
@@ -247,13 +254,13 @@ publisher_created(Protocol) ->
247254
counters:add(fetch(Protocol), ?PUBLISHERS, 1).
248255

249256
publisher_deleted(Protocol) ->
250-
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
257+
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).
251258

252259
consumer_created(Protocol) ->
253260
counters:add(fetch(Protocol), ?CONSUMERS, 1).
254261

255262
consumer_deleted(Protocol) ->
256-
counters:add(fetch(Protocol), ?CONSUMERS, -1).
263+
counters:sub(fetch(Protocol), ?CONSUMERS, 1).
257264

258265
messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
259266
Index = case Reason of
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% This module tracks received message size distribution as histogram.
9+
%% (A histogram is represented by a set of counters, one for each bucket.)
10+
-module(rabbit_msg_size_metrics).
11+
12+
-export([init/1,
13+
observe/2,
14+
prometheus_format/0]).
15+
16+
%% Integration tests.
17+
-export([raw_buckets/1,
18+
diff_raw_buckets/2]).
19+
20+
-ifdef(TEST).
21+
-export([cleanup/1]).
22+
-endif.
23+
24+
-define(BUCKET_1, 100).
25+
-define(BUCKET_2, 1_000).
26+
-define(BUCKET_3, 10_000).
27+
-define(BUCKET_4, 100_000).
28+
-define(BUCKET_5, 1_000_000).
29+
-define(BUCKET_6, 10_000_000).
30+
%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB.
31+
%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB.
32+
%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB.
33+
-define(BUCKET_7, 50_000_000).
34+
-define(BUCKET_8, 100_000_000).
35+
%% 'infinity' means practically 512 MiB as hard limited in
36+
%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257
37+
-define(BUCKET_9, 'infinity').
38+
39+
-define(MSG_SIZE_BUCKETS,
40+
[{1, ?BUCKET_1},
41+
{2, ?BUCKET_2},
42+
{3, ?BUCKET_3},
43+
{4, ?BUCKET_4},
44+
{5, ?BUCKET_5},
45+
{6, ?BUCKET_6},
46+
{7, ?BUCKET_7},
47+
{8, ?BUCKET_8},
48+
{9, ?BUCKET_9}]).
49+
50+
-define(POS_MSG_SIZE_SUM, 10).
51+
52+
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
53+
NumObservations :: non_neg_integer()}].
54+
55+
-spec init(atom()) -> ok.
56+
init(Protocol) ->
57+
Size = ?POS_MSG_SIZE_SUM,
58+
Counters = counters:new(Size, [write_concurrency]),
59+
put_counters(Protocol, Counters).
60+
61+
-spec observe(atom(), non_neg_integer()) -> ok.
62+
observe(Protocol, MessageSize) ->
63+
BucketPos = find_bucket_pos(MessageSize),
64+
Counters = get_counters(Protocol),
65+
counters:add(Counters, BucketPos, 1),
66+
counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize).
67+
68+
-spec prometheus_format() -> #{atom() => map()}.
69+
prometheus_format() ->
70+
Values = [prometheus_values(Counters) || Counters <- get_labels_counters()],
71+
#{message_size_bytes => #{type => histogram,
72+
help => "Size of messages received from publishers",
73+
values => Values}}.
74+
75+
find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1;
76+
find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2;
77+
find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3;
78+
find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4;
79+
find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5;
80+
find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6;
81+
find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7;
82+
find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8;
83+
find_bucket_pos(_Size) -> 9.
84+
85+
raw_buckets(Protocol)
86+
when is_atom(Protocol) ->
87+
Counters = get_counters(Protocol),
88+
raw_buckets(Counters);
89+
raw_buckets(Counters) ->
90+
[{UpperBound, counters:get(Counters, Pos)}
91+
|| {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS].
92+
93+
-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets().
94+
diff_raw_buckets(After, Before) ->
95+
diff_raw_buckets(After, Before, []).
96+
97+
diff_raw_buckets([], [], Acc) ->
98+
lists:reverse(Acc);
99+
diff_raw_buckets([{UpperBound, CounterAfter} | After],
100+
[{UpperBound, CounterBefore} | Before],
101+
Acc) ->
102+
case CounterAfter - CounterBefore of
103+
0 ->
104+
diff_raw_buckets(After, Before, Acc);
105+
Diff ->
106+
diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc])
107+
end.
108+
109+
%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets
110+
%% aren’t just a count of events that fall into them. The buckets also include a count of
111+
%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total
112+
%% number of events. This is known as a cumulative histogram, and why the bucket label
113+
%% is called le, standing for less than or equal to.
114+
%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐
115+
%% tive in two different ways."
116+
%% [Prometheus: Up & Running]
117+
prometheus_values({Labels, Counters}) ->
118+
{Buckets, Count} = lists:mapfoldl(
119+
fun({UpperBound, NumObservations}, Acc0) ->
120+
Acc = Acc0 + NumObservations,
121+
{{UpperBound, Acc}, Acc}
122+
end, 0, raw_buckets(Counters)),
123+
Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM),
124+
{Labels, Buckets, Count, Sum}.
125+
126+
put_counters(Protocol, Counters) ->
127+
persistent_term:put({?MODULE, Protocol}, Counters).
128+
129+
get_counters(Protocol) ->
130+
persistent_term:get({?MODULE, Protocol}).
131+
132+
get_labels_counters() ->
133+
[{[{protocol, Protocol}], Counters}
134+
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
135+
136+
-ifdef(TEST).
137+
%% "Counters are not tied to the current process and are automatically
138+
%% garbage collected when they are no longer referenced."
139+
-spec cleanup(atom()) -> ok.
140+
cleanup(Protocol) ->
141+
persistent_term:erase({?MODULE, Protocol}),
142+
ok.
143+
-endif.

0 commit comments

Comments
 (0)