Skip to content

Commit 7e5d6f5

Browse files
gomoripetiansd
authored andcommitted
Add global histogram metrics for received message sizes per-protocol
fixup: add new files to bazel fixup: expose message_size_bytes as prometheus classic histogram type `rabbit_msg_size_metrics` does not use `seshat` any more, but `counters` directly. fixup: add msg_size_metrics unit test
1 parent 474d76f commit 7e5d6f5

13 files changed

+450
-8
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ rabbitmq_integration_suite(
478478
],
479479
)
480480

481+
rabbitmq_integration_suite(
482+
name = "global_metrics_SUITE",
483+
size = "small",
484+
)
485+
481486
rabbitmq_integration_suite(
482487
name = "list_consumers_sanity_check_SUITE",
483488
size = "medium",
@@ -993,6 +998,11 @@ rabbitmq_integration_suite(
993998
size = "medium",
994999
)
9951000

1001+
rabbitmq_suite(
1002+
name = "unit_msg_size_metrics_SUITE",
1003+
size = "small",
1004+
)
1005+
9961006
rabbitmq_suite(
9971007
name = "unit_operator_policy_SUITE",
9981008
size = "small",

deps/rabbit/app.bzl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def all_beam_files(name = "all_beam_files"):
169169
"src/rabbit_metrics.erl",
170170
"src/rabbit_mirror_queue_misc.erl",
171171
"src/rabbit_mnesia.erl",
172+
"src/rabbit_msg_size_metrics.erl",
172173
"src/rabbit_msg_store.erl",
173174
"src/rabbit_msg_store_gc.erl",
174175
"src/rabbit_networking.erl",
@@ -425,6 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
425426
"src/rabbit_metrics.erl",
426427
"src/rabbit_mirror_queue_misc.erl",
427428
"src/rabbit_mnesia.erl",
429+
"src/rabbit_msg_size_metrics.erl",
428430
"src/rabbit_msg_store.erl",
429431
"src/rabbit_msg_store_gc.erl",
430432
"src/rabbit_networking.erl",
@@ -1714,6 +1716,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17141716
erlc_opts = "//:test_erlc_opts",
17151717
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
17161718
)
1719+
erlang_bytecode(
1720+
name = "unit_msg_size_metrics_SUITE_beam_files",
1721+
testonly = True,
1722+
srcs = ["test/unit_msg_size_metrics_SUITE.erl"],
1723+
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
1724+
app_name = "rabbit",
1725+
erlc_opts = "//:test_erlc_opts",
1726+
deps = [],
1727+
)
17171728
erlang_bytecode(
17181729
name = "unit_operator_policy_SUITE_beam_files",
17191730
testonly = True,

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,6 +2337,7 @@ incoming_link_transfer(
23372337
end,
23382338
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
23392339
validate_message_size(PayloadBin, MaxMessageSize),
2340+
rabbit_global_counters:message_size(?PROTOCOL, byte_size(PayloadBin)),
23402341

23412342
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23422343
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,7 @@ check_msg_size(Content, GCThreshold) ->
985985
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
986986
case Size =< MaxMessageSize of
987987
true ->
988+
rabbit_global_counters:message_size(amqp091, Size),
988989
ok;
989990
false ->
990991
Fmt = case MaxMessageSize of

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
publisher_deleted/1,
3535
consumer_created/1,
3636
consumer_deleted/1,
37+
message_size/2,
3738
messages_dead_lettered/4,
3839
messages_dead_lettered_confirmed/3
3940
]).
@@ -186,17 +187,21 @@ init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
186187
init(Labels = [{protocol, Protocol}], Extra) ->
187188
_ = seshat:new_group(?MODULE),
188189
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
189-
persistent_term:put({?MODULE, Protocol}, Counters);
190+
persistent_term:put({?MODULE, Protocol}, Counters),
191+
rabbit_msg_size_metrics:init(Labels);
190192
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
191193
_ = seshat:new_group(?MODULE),
192194
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
193195
persistent_term:put({?MODULE, QueueType, DLS}, Counters).
194196

195197
overview() ->
196-
seshat:overview(?MODULE).
198+
maps:merge_with(
199+
fun(_Key, Value1, Value2) -> maps:merge(Value1, Value2) end,
200+
rabbit_msg_size_metrics:overview(),
201+
seshat:overview(?MODULE)).
197202

198203
prometheus_format() ->
199-
seshat:format(?MODULE).
204+
maps:merge(seshat:format(?MODULE), rabbit_msg_size_metrics:prometheus_format()).
200205

201206
increase_protocol_counter(Protocol, Counter, Num) ->
202207
counters:add(fetch(Protocol), Counter, Num).
@@ -255,6 +260,9 @@ consumer_created(Protocol) ->
255260
consumer_deleted(Protocol) ->
256261
counters:add(fetch(Protocol), ?CONSUMERS, -1).
257262

263+
message_size(Protocol, MessageSize) ->
264+
rabbit_msg_size_metrics:update(Protocol, MessageSize).
265+
258266
messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
259267
Index = case Reason of
260268
maxlen -> ?MESSAGES_DEAD_LETTERED_MAXLEN;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% This module tracks received message size distribution as histograms.
9+
%% (One histogram is represented by a set of counters, one for each
10+
%% bucket.)
11+
-module(rabbit_msg_size_metrics).
12+
13+
-export([init/1,
14+
overview/0,
15+
prometheus_format/0,
16+
update/2]).
17+
18+
%% Useful for testing
19+
-export([overview/1,
20+
changed_buckets/2,
21+
cleanup/1]).
22+
23+
-define(MSG_SIZE_BUCKETS,
24+
[{1, 64},
25+
{2, 256},
26+
{3, 1024},
27+
{4, 4 * 1024},
28+
{5, 16 * 1024},
29+
{6, 64 * 1024},
30+
{7, 256 * 1024},
31+
{8, 1024 * 1024},
32+
{9, 4 * 1024 * 1024},
33+
{10, 16 * 1024 * 1024},
34+
{11, 64 * 1024 * 1024},
35+
{12, 256 * 1024 * 1024},
36+
{13, infinity}]).
37+
-define(MSG_SIZE_SUM_POS, 14).
38+
39+
-type labels() :: [{protocol, atom()}].
40+
-type hist_values() :: #{BucketUpperBound :: integer() => integer(), sum => integer()}.
41+
42+
-spec init(labels()) -> any().
43+
init([{protocol, Protocol}]) ->
44+
Size = ?MSG_SIZE_SUM_POS,
45+
Counters = counters:new(Size, [write_concurrency]),
46+
put_counters(Protocol, Counters).
47+
48+
-spec cleanup(labels()) -> any().
49+
cleanup([{protocol, Protocol}]) ->
50+
delete_counters(Protocol).
51+
52+
-spec overview() -> #{labels() => #{atom() => hist_values()}}.
53+
overview() ->
54+
LabelsList = fetch_labels(),
55+
maps:from_list([{Labels, #{message_size_bytes => overview(Labels)}} || Labels <- LabelsList]).
56+
57+
-spec overview(labels()) -> hist_values().
58+
overview(Labels) ->
59+
{BucketValues, Sum} = values(Labels),
60+
BucketMap = maps:from_list(BucketValues),
61+
BucketMap#{sum => Sum}.
62+
63+
-spec prometheus_format() -> #{atom() => map()}.
64+
prometheus_format() ->
65+
LabelsList = fetch_labels(),
66+
Values = [prometheus_values(Labels) || Labels <- LabelsList],
67+
68+
#{message_size_bytes => #{type => histogram,
69+
help => "Size of messages received from publishers",
70+
values => Values}}.
71+
72+
-spec update(atom(), integer()) -> any().
73+
update(Protocol, MessageSize) ->
74+
BucketPos = find_hist_bucket(?MSG_SIZE_BUCKETS, MessageSize),
75+
Counters = fetch_counters(Protocol),
76+
counters:add(Counters, BucketPos, 1),
77+
counters:add(Counters, ?MSG_SIZE_SUM_POS, MessageSize).
78+
79+
-spec changed_buckets(hist_values(), hist_values()) -> hist_values().
80+
changed_buckets(After, Before) ->
81+
maps:filtermap(
82+
fun(Key, ValueAfter) ->
83+
case ValueAfter - maps:get(Key, Before) of
84+
0 -> false;
85+
Diff -> {true, Diff}
86+
end
87+
end, After).
88+
89+
%%
90+
%% Helper functions
91+
%%
92+
93+
find_hist_bucket([{BucketPos, UpperBound}|_], MessageSize) when MessageSize =< UpperBound ->
94+
BucketPos;
95+
find_hist_bucket([{BucketPos, _Infinity}], _) ->
96+
BucketPos;
97+
find_hist_bucket([_|T], MessageSize) ->
98+
find_hist_bucket(T, MessageSize).
99+
100+
%% Returned bucket values are count in the range (UpperBound[N-1]-UpperBound[N]]
101+
values(_Labels = [{protocol, Protocol}]) ->
102+
Counters = fetch_counters(Protocol),
103+
Sum = counters:get(Counters, ?MSG_SIZE_SUM_POS),
104+
BucketValues =
105+
[{UpperBound, counters:get(Counters, Pos)}
106+
|| {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS],
107+
{BucketValues, Sum}.
108+
109+
%% Returned bucket values are cumulative counts, ie in the range 0-UpperBound[N],
110+
%% as defined by Prometheus classic histogram format
111+
prometheus_values(Labels) ->
112+
{BucketValues, Sum} = values(Labels),
113+
{CumulatedValues, Count} =
114+
lists:mapfoldl(
115+
fun({UpperBound, V}, Count0) ->
116+
CumulatedValue = Count0 + V,
117+
{{UpperBound, CumulatedValue}, CumulatedValue}
118+
end, 0, BucketValues),
119+
{Labels, CumulatedValues, Count, Sum}.
120+
121+
put_counters(Protocol, Counters) ->
122+
persistent_term:put({?MODULE, Protocol}, Counters).
123+
124+
fetch_counters(Protocol) ->
125+
persistent_term:get({?MODULE, Protocol}).
126+
127+
fetch_labels() ->
128+
[[{protocol, Protocol}] || {{?MODULE, Protocol}, _} <- persistent_term:get()].
129+
130+
delete_counters(Protocol) ->
131+
persistent_term:erase({?MODULE, Protocol}).

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3739,7 +3739,8 @@ global_counters(Config) ->
37393739
messages_confirmed_total := Confirmed0,
37403740
messages_routed_total := Routed0,
37413741
messages_unroutable_dropped_total := UnroutableDropped0,
3742-
messages_unroutable_returned_total := UnroutableReturned0} = get_global_counters(Config),
3742+
messages_unroutable_returned_total := UnroutableReturned0,
3743+
message_size_bytes := MsgSizeBytes0} = get_global_counters(Config),
37433744

37443745
#{messages_delivered_total := CQDelivered0,
37453746
messages_redelivered_total := CQRedelivered0,
@@ -3792,13 +3793,19 @@ global_counters(Config) ->
37923793
messages_confirmed_total := Confirmed1,
37933794
messages_routed_total := Routed1,
37943795
messages_unroutable_dropped_total := UnroutableDropped1,
3795-
messages_unroutable_returned_total := UnroutableReturned1} = get_global_counters(Config),
3796+
messages_unroutable_returned_total := UnroutableReturned1,
3797+
message_size_bytes := MsgSizeBytes} = get_global_counters(Config),
37963798
?assertEqual(Received0 + 2, Received1),
37973799
?assertEqual(ReceivedConfirm0 + 1, ReceivedConfirm1),
37983800
?assertEqual(Confirmed0 + 1, Confirmed1),
37993801
?assertEqual(Routed0 + 2, Routed1),
38003802
?assertEqual(UnroutableDropped0, UnroutableDropped1),
38013803
?assertEqual(UnroutableReturned0, UnroutableReturned1),
3804+
%% the 2 byte message body is encapsulated in an #'v1_0.data'{}
3805+
%% structure, which takes 7 bytes encoded
3806+
?assertEqual(#{64 => 2,
3807+
sum => (2 * 7)},
3808+
rabbit_msg_size_metrics:changed_buckets(MsgSizeBytes, MsgSizeBytes0)),
38023809

38033810
#{messages_delivered_total := CQDelivered1,
38043811
messages_redelivered_total := CQRedelivered1,
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(global_metrics_SUITE).
9+
10+
-compile([export_all, nowarn_export_all]).
11+
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
14+
all() ->
15+
[
16+
{group, tests}
17+
].
18+
19+
groups() ->
20+
[
21+
{tests, [], [
22+
message_size,
23+
over_max_message_size
24+
]}
25+
].
26+
27+
%% -------------------------------------------------------------------
28+
%% Testsuite setup/teardown.
29+
%% -------------------------------------------------------------------
30+
31+
init_per_suite(Config) ->
32+
rabbit_ct_helpers:log_environment(),
33+
rabbit_ct_helpers:run_setup_steps(Config).
34+
35+
end_per_suite(Config) ->
36+
rabbit_ct_helpers:run_teardown_steps(Config).
37+
38+
init_per_group(Group, Config) ->
39+
Config1 = rabbit_ct_helpers:set_config(Config, [
40+
{rmq_nodename_suffix, Group},
41+
{rmq_nodes_count, 1}
42+
]),
43+
rabbit_ct_helpers:run_steps(Config1,
44+
rabbit_ct_broker_helpers:setup_steps() ++
45+
rabbit_ct_client_helpers:setup_steps()).
46+
47+
end_per_group(_Group, Config) ->
48+
rabbit_ct_helpers:run_steps(Config,
49+
rabbit_ct_client_helpers:teardown_steps() ++
50+
rabbit_ct_broker_helpers:teardown_steps()).
51+
52+
init_per_testcase(Testcase, Config) ->
53+
rabbit_ct_helpers:testcase_started(Config, Testcase).
54+
55+
end_per_testcase(Testcase, Config) ->
56+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
57+
58+
%% -------------------------------------------------------------------
59+
%% Test cases
60+
%% -------------------------------------------------------------------
61+
62+
message_size(Config) ->
63+
Binary2B = <<"12">>,
64+
Binary2M = binary:copy(<<"x">>, 2_000_000),
65+
66+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
67+
68+
Before = get_msg_size_metrics(Config),
69+
70+
[amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Payload})
71+
|| Payload <- [Binary2B, Binary2B, Binary2M]],
72+
73+
After = get_msg_size_metrics(Config),
74+
75+
?assertEqual(#{64 => 2,
76+
4 * 1024 * 1024 => 1,
77+
sum => 2_000_004},
78+
rabbit_msg_size_metrics:changed_buckets(After, Before)).
79+
80+
over_max_message_size(Config) ->
81+
Binary4M = binary:copy(<<"x">>, 4_000_000),
82+
83+
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [max_message_size, 3_000_000]),
84+
85+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
86+
87+
Before = get_msg_size_metrics(Config),
88+
89+
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
90+
91+
After = get_msg_size_metrics(Config),
92+
93+
%% No metrics are bumped if over max message size
94+
?assertEqual(Before, After).
95+
96+
%% -------------------------------------------------------------------
97+
%% Implementation
98+
%% -------------------------------------------------------------------
99+
100+
get_msg_size_metrics(Config) ->
101+
rabbit_ct_broker_helpers:rpc(Config, rabbit_msg_size_metrics, overview, [[{protocol, amqp091}]]).

0 commit comments

Comments
 (0)