Skip to content

Commit 1ec2152

Browse files
committed
Improve message size histogram
1. Avoid unnecessary time series emitted for stream protocol The stream protocol cannot observe message sizes. This commit ensures that the following time series are omitted: ``` rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0 rabbitmq_global_message_size_bytes_count{protocol="stream"} 0 rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0 ``` This reduces the number of time series by 15. 2. Further reduce the number of time series by reducing the number of buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not free, each is an extra time series stored. Prior to this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 92 ``` After this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 57 ``` 3. The emitted metric should be called `rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`. The latter is poor naming. There is no need to use `global` in the metric name given that this metric doesn't exist in the old flawed aggregated metrics. 4. This commit simplies module `rabbit_global_counters`. 5. Avoid garbage collecting the 10-elements list of buckets per message being received.
1 parent 7e5d6f5 commit 1ec2152

20 files changed

+459
-402
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,10 @@ rabbitmq_integration_suite(
479479
)
480480

481481
rabbitmq_integration_suite(
482-
name = "global_metrics_SUITE",
483-
size = "small",
482+
name = "msg_size_metrics_SUITE",
483+
runtime_deps = [
484+
"//deps/rabbitmq_amqp_client:erlang_app",
485+
],
484486
)
485487

486488
rabbitmq_integration_suite(

deps/rabbit/app.bzl

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def all_beam_files(name = "all_beam_files"):
169169
"src/rabbit_metrics.erl",
170170
"src/rabbit_mirror_queue_misc.erl",
171171
"src/rabbit_mnesia.erl",
172-
"src/rabbit_msg_size_metrics.erl",
172+
"src/rabbit_msg_size_metrics.erl",
173173
"src/rabbit_msg_store.erl",
174174
"src/rabbit_msg_store_gc.erl",
175175
"src/rabbit_networking.erl",
@@ -426,7 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
426426
"src/rabbit_metrics.erl",
427427
"src/rabbit_mirror_queue_misc.erl",
428428
"src/rabbit_mnesia.erl",
429-
"src/rabbit_msg_size_metrics.erl",
429+
"src/rabbit_msg_size_metrics.erl",
430430
"src/rabbit_msg_store.erl",
431431
"src/rabbit_msg_store_gc.erl",
432432
"src/rabbit_networking.erl",
@@ -705,6 +705,7 @@ def all_srcs(name = "all_srcs"):
705705
"src/rabbit_metrics.erl",
706706
"src/rabbit_mirror_queue_misc.erl",
707707
"src/rabbit_mnesia.erl",
708+
"src/rabbit_msg_size_metrics.erl",
708709
"src/rabbit_msg_store.erl",
709710
"src/rabbit_msg_store_gc.erl",
710711
"src/rabbit_networking.erl",
@@ -1723,7 +1724,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17231724
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
17241725
app_name = "rabbit",
17251726
erlc_opts = "//:test_erlc_opts",
1726-
deps = [],
17271727
)
17281728
erlang_bytecode(
17291729
name = "unit_operator_policy_SUITE_beam_files",
@@ -2194,3 +2194,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21942194
app_name = "rabbit",
21952195
erlc_opts = "//:test_erlc_opts",
21962196
)
2197+
erlang_bytecode(
2198+
name = "msg_size_metrics_SUITE_beam_files",
2199+
testonly = True,
2200+
srcs = ["test/msg_size_metrics_SUITE.erl"],
2201+
outs = ["test/msg_size_metrics_SUITE.beam"],
2202+
app_name = "rabbit",
2203+
erlc_opts = "//:test_erlc_opts",
2204+
deps = ["//deps/amqp_client:erlang_app"],
2205+
)

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2336,8 +2336,9 @@ incoming_link_transfer(
23362336
{MsgBin0, FirstDeliveryId, FirstSettled}
23372337
end,
23382338
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
2339-
validate_message_size(PayloadBin, MaxMessageSize),
2340-
rabbit_global_counters:message_size(?PROTOCOL, byte_size(PayloadBin)),
2339+
PayloadSize = iolist_size(PayloadBin),
2340+
validate_message_size(PayloadSize, MaxMessageSize),
2341+
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
23412342

23422343
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23432344
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
@@ -3067,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) ->
30673068

30683069
validate_message_size(_, unlimited) ->
30693070
ok;
3070-
validate_message_size(Message, MaxMsgSize)
3071-
when is_integer(MaxMsgSize) ->
3072-
MsgSize = iolist_size(Message),
3071+
validate_message_size(MsgSize, MaxMsgSize)
3072+
when is_integer(MsgSize) ->
30733073
case MsgSize =< MaxMsgSize of
30743074
true ->
30753075
ok;
@@ -3083,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize)
30833083
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
30843084
"message size (~b bytes) > maximum message size (~b bytes)",
30853085
[MsgSize, MaxMsgSize])
3086-
end.
3086+
end;
3087+
validate_message_size(Msg, MaxMsgSize) ->
3088+
validate_message_size(iolist_size(Msg), MaxMsgSize).
30873089

30883090
-spec ensure_terminus(source | target,
30893091
term(),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -985,8 +985,7 @@ check_msg_size(Content, GCThreshold) ->
985985
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
986986
case Size =< MaxMessageSize of
987987
true ->
988-
rabbit_global_counters:message_size(amqp091, Size),
989-
ok;
988+
rabbit_msg_size_metrics:observe(amqp091, Size);
990989
false ->
991990
Fmt = case MaxMessageSize of
992991
?MAX_MSG_SIZE ->

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
boot_step/0,
1414
init/1,
1515
init/2,
16-
overview/0,
1716
prometheus_format/0,
1817
increase_protocol_counter/3,
1918
messages_received/2,
@@ -34,11 +33,14 @@
3433
publisher_deleted/1,
3534
consumer_created/1,
3635
consumer_deleted/1,
37-
message_size/2,
3836
messages_dead_lettered/4,
3937
messages_dead_lettered_confirmed/3
4038
]).
4139

40+
-ifdef(TEST).
41+
-export([overview/0]).
42+
-endif.
43+
4244
%% PROTOCOL COUNTERS:
4345
-define(MESSAGES_RECEIVED, 1).
4446
-define(MESSAGES_RECEIVED_CONFIRM, 2).
@@ -133,12 +135,14 @@
133135
boot_step() ->
134136
[begin
135137
%% Protocol counters
136-
init([{protocol, Proto}]),
138+
Protocol = {protocol, Proto},
139+
init([Protocol]),
140+
rabbit_msg_size_metrics:init(Proto),
137141

138142
%% Protocol & Queue Type counters
139-
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
140-
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
141-
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
143+
init([Protocol, {queue_type, rabbit_classic_queue}]),
144+
init([Protocol, {queue_type, rabbit_quorum_queue}]),
145+
init([Protocol, {queue_type, rabbit_stream_queue}])
142146
end || Proto <- [amqp091, amqp10]],
143147

144148
%% Dead Letter counters
@@ -187,21 +191,19 @@ init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
187191
init(Labels = [{protocol, Protocol}], Extra) ->
188192
_ = seshat:new_group(?MODULE),
189193
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
190-
persistent_term:put({?MODULE, Protocol}, Counters),
191-
rabbit_msg_size_metrics:init(Labels);
194+
persistent_term:put({?MODULE, Protocol}, Counters);
192195
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
193196
_ = seshat:new_group(?MODULE),
194197
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
195198
persistent_term:put({?MODULE, QueueType, DLS}, Counters).
196199

200+
-ifdef(TEST).
197201
overview() ->
198-
maps:merge_with(
199-
fun(_Key, Value1, Value2) -> maps:merge(Value1, Value2) end,
200-
rabbit_msg_size_metrics:overview(),
201-
seshat:overview(?MODULE)).
202+
seshat:overview(?MODULE).
203+
-endif.
202204

203205
prometheus_format() ->
204-
maps:merge(seshat:format(?MODULE), rabbit_msg_size_metrics:prometheus_format()).
206+
seshat:format(?MODULE).
205207

206208
increase_protocol_counter(Protocol, Counter, Num) ->
207209
counters:add(fetch(Protocol), Counter, Num).
@@ -252,16 +254,13 @@ publisher_created(Protocol) ->
252254
counters:add(fetch(Protocol), ?PUBLISHERS, 1).
253255

254256
publisher_deleted(Protocol) ->
255-
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
257+
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).
256258

257259
consumer_created(Protocol) ->
258260
counters:add(fetch(Protocol), ?CONSUMERS, 1).
259261

260262
consumer_deleted(Protocol) ->
261-
counters:add(fetch(Protocol), ?CONSUMERS, -1).
262-
263-
message_size(Protocol, MessageSize) ->
264-
rabbit_msg_size_metrics:update(Protocol, MessageSize).
263+
counters:sub(fetch(Protocol), ?CONSUMERS, 1).
265264

266265
messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
267266
Index = case Reason of

0 commit comments

Comments
 (0)