Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ APPS_DIR := $(CURDIR)/apps
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper

PLT_APPS += mnesia
Expand All @@ -150,6 +150,7 @@ dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris master
# TODO: Use systemd from Hex.pm, once there is a new post-0.6.0 release.
dep_systemd = git https://github.com/hauleth/erlang-systemd e732727b0b637eb29e8adc77a4eb46d7ebc0f41a
dep_seshat = git https://github.com/rabbitmq/seshat main

define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_messages_counters,
[{description, "messages metrics storage"},
{mfa, {rabbit_messages_counters, init,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).

%% -rabbit_boot_step({rabbit_stream_coordinator,
%% [{description, "stream queues coordinator"},
%% {mfa, {rabbit_stream_coordinator, start,
Expand Down
7 changes: 0 additions & 7 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2105,13 +2105,6 @@ notify_limiter(Limiter, Acked) ->
end
end.

deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
_RoutedToQs = []}, State) -> %% optimisation
?INCR_STATS(exchange_stats, XName, 1, publish, State),
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
mandatory = Mandatory,
Expand Down
202 changes: 202 additions & 0 deletions deps/rabbit/src/rabbit_messages_counters.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_messages_counters).
-on_load(init/0).

-export([
init/0,
new/3,
messages_published/3,
messages_routed/3,
messages_delivered_consume_ack/3,
messages_delivered_consume_autoack/3,
messages_delivered_get_ack/3,
messages_delivered_get_autoack/3,
messages_redelivered/3,
basic_get_empty/3,
messages_unroutable_dropped/3,
messages_unroutable_returned/3,
messages_confirmed/3
]).


-define(MESSAGES_PUBLISHED, 1).
-define(MESSAGES_ROUTED, 2).
-define(MESSAGES_DELIVERED_CONSUME_ACK, 3).
-define(MESSAGES_DELIVERED_CONSUME_AUTOACK, 4).
-define(MESSAGES_DELIVERED_GET_ACK, 5).
-define(MESSAGES_DELIVERED_GET_AUTOACK, 6).
-define(MESSAGES_REDELIVERED, 7).
-define(BASIC_GET_EMPTY, 8).
-define(MESSAGES_UNROUTABLE_DROPPED, 9).
-define(MESSAGES_UNROUTABLE_RETURNED, 10).
-define(MESSAGES_CONFIRMED, 11).

-define(COUNTERS,
[
{
messages_published_total, ?MESSAGES_PUBLISHED, counter,
"Total number of messages published to queues and streams"
},
{
messages_routed_total, ?MESSAGES_ROUTED, counter,
"Total number of messages routed to queues"
},
{
messages_delivered_consume_ack_total, ?MESSAGES_DELIVERED_CONSUME_ACK, counter,
"Total number of messages consumed using basic.consume with manual acknowledgment"
},
{
messages_delivered_consume_autoack_total, ?MESSAGES_DELIVERED_CONSUME_AUTOACK, counter,
"Total number of messages consumed using basic.consume with automatic acknowledgment"
},
{
messages_delivered_get_ack_total, ?MESSAGES_DELIVERED_GET_ACK, counter,
"Total number of messages consumed using basic.get with manual acknowledgment"
},
{
messages_delivered_get_autoack_total, ?MESSAGES_DELIVERED_GET_AUTOACK, counter,
"Total number of messages consumed using basic.get with automatic acknowledgment"
},
{
messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
"Total number of messages redelivered to consumers"
},
{
basic_get_empty_total, ?BASIC_GET_EMPTY, counter,
"Total number of times basic.get operations fetched no message"
},
{
messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
"Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
},
{
messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
"Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
},
{
messages_confirmed_total, ?MESSAGES_CONFIRMED, counter,
"Total number of messages confirmed to publishers"
}
]).

init() ->
seshat_counters:new_group(queue),
seshat_counters:new_group(global),
new(global, global, []),
ok.

new(Group, Object, Fields) ->
%% Some object could have extra metrics, i.e. queue types might have their own counters
seshat_counters:new(Group, Object, ?COUNTERS ++ Fields).

% TODO - these are received by queues, not from clients (doesn't account for unroutable)
messages_published(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_PUBLISHED, Num).

% formerly known as queue_messages_published_total
messages_routed(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_ROUTED, Num).

messages_delivered_consume_ack(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_ACK, Num).

messages_delivered_consume_autoack(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_AUTOACK, Num).

messages_delivered_get_ack(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_DELIVERED_GET_ACK, Num).

messages_delivered_get_autoack(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_DELIVERED_GET_AUTOACK, Num).

% not implemented yet
messages_redelivered(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_REDELIVERED, Num).

basic_get_empty(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?BASIC_GET_EMPTY, Num).

% implemented in rabbit_core_metrics (it doesn't reach a queue)
messages_unroutable_returned(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_UNROUTABLE_RETURNED, Num).

% implemented in rabbit_core_metrics (it doesn't reach a queue)
messages_unroutable_dropped(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_UNROUTABLE_DROPPED, Num).

messages_confirmed(Group, Object, Num) ->
counters:add(seshat_counters:fetch(Group, Object), ?MESSAGES_CONFIRMED, Num).

% TODO
% channel_messages_redelivered_total "Total number of messages redelivered to consumers"
%
% connection_incoming_bytes_total "Total number of bytes received on a connection"
% connection_outgoing_bytes_total "Total number of bytes sent on a connection"
% connection_process_reductions_total "Total number of connection process reductions"
% connection_incoming_packets_total "Total number of packets received on a connection"
% connection_outgoing_packets_total "Total number of packets sent on a connection"
%
% io_read_ops_total "Total number of I/O read operations"
% io_read_bytes_total "Total number of I/O bytes read"
% io_write_ops_total "Total number of I/O write operations"
% io_write_bytes_total "Total number of I/O bytes written"
% io_sync_ops_total "Total number of I/O sync operations"
% io_seek_ops_total "Total number of I/O seek operations"
% io_open_attempt_ops_total "Total number of file open attempts"
% io_reopen_ops_total "Total number of times files have been reopened"
%
% schema_db_ram_tx_total "Total number of Schema DB memory transactions"
% schema_db_disk_tx_total "Total number of Schema DB disk transactions"
% msg_store_read_total "Total number of Message Store read operations"
% msg_store_write_total "Total number of Message Store write operations"
% queue_index_read_ops_total "Total number of Queue Index read operations"
% queue_index_write_ops_total "Total number of Queue Index write operations"
% queue_index_journal_write_ops_total "Total number of Queue Index Journal write operations"
% io_read_time_seconds_total "Total I/O read time"
% io_write_time_seconds_total "Total I/O write time"
% io_sync_time_seconds_total "Total I/O sync time"
% io_seek_time_seconds_total "Total I/O seek time"
% io_open_attempt_time_seconds_total "Total file open attempts time"
% raft_term_total "Current Raft term number"
% queue_disk_reads_total "Total number of times queue read messages from disk"
% queue_disk_writes_total "Total number of times queue wrote messages to disk"

% DONE
% channel_messages_published_total "Total number of messages published into an exchange on a channel"
% channel_messages_confirmed_total "Total number of messages published into an exchange and confirmed on the channel"
% channel_messages_unroutable_returned_total "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
% channel_messages_unroutable_dropped_total "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
% channel_get_empty_total "Total number of times basic.get operations fetched no message"
% channel_get_ack_total "Total number of messages fetched with basic.get in manual acknowledgement mode"
% channel_get_total "Total number of messages fetched with basic.get in automatic acknowledgement mode"
% channel_messages_delivered_ack_total "Total number of messages delivered to consumers in manual acknowledgement mode"
% channel_messages_delivered_total "Total number of messages delivered to consumers in automatic acknowledgement mode"
% queue_messages_published_total "Total number of messages published to queues"

% IGNORED (IS THIS USEFUL?)
% channel_process_reductions_total "Total number of channel process reductions"
% queue_process_reductions_total "Total number of queue process reductions"

% NOT NECESSARY (DON'T GO TO ZERO)
% erlang_gc_runs_total "Total number of Erlang garbage collector runs"
% erlang_gc_reclaimed_bytes_total "Total number of bytes of memory reclaimed by Erlang garbage collector"
% erlang_scheduler_context_switches_total "Total number of Erlang scheduler context switches"
% connections_opened_total "Total number of connections opened"
% connections_closed_total "Total number of connections closed or terminated"
% channels_opened_total "Total number of channels opened"
% channels_closed_total "Total number of channels closed"
% queues_declared_total "Total number of queues declared"
% queues_created_total "Total number of queues created"
% queues_deleted_total "Total number of queues deleted"
% auth_attempts_total "Total number of authorization attempts"
% auth_attempts_succeeded_total "Total number of successful authentication attempts"
% auth_attempts_failed_total "Total number of failed authentication attempts"
% auth_attempts_detailed_total "Total number of authorization attempts with source info"
% auth_attempts_detailed_succeeded_total "Total number of successful authorization attempts with source info"
% auth_attempts_detailed_failed_total "Total number of failed authorization attempts with source info"
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_osiris_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

handle_info(tick, #state{timeout = Timeout} = State) ->
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
maps:map(
fun ({osiris_writer, QName}, #{offset := Offs,
first_offset := FstOffs}) ->
Expand Down
80 changes: 53 additions & 27 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(rabbit_queue_type).
-include("amqqueue.hrl").
-include_lib("rabbit_common/include/resource.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([
init/0,
Expand Down Expand Up @@ -449,34 +449,55 @@ deliver(Qs, Delivery, State) ->
end.

deliver0(Qs, Delivery, stateless) ->
_ = lists:map(fun(Q) ->
Mod = amqqueue:get_type(Q),
_ = Mod:deliver([{Q, stateless}], Delivery)
end, Qs),
case Qs of
[] ->
case Delivery#delivery.mandatory of
false -> rabbit_messages_counters:messages_unroutable_dropped(global, global, 1);
true -> rabbit_messages_counters:messages_unroutable_returned(global, global, 1)
end;
_ ->
rabbit_messages_counters:messages_routed(global, global, 1),
_ = lists:map(fun(Q) ->
Mod = amqqueue:get_type(Q),
_ = Mod:deliver([{Q, stateless}], Delivery)
end, Qs),
rabbit_messages_counters:messages_published(global, global, length(Qs))
end,
{ok, stateless, []};
deliver0(Qs, Delivery, #?STATE{} = State0) ->
%% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(
fun (Q, Acc) ->
T = amqqueue:get_type(Q),
Ctx = get_ctx(Q, State0),
maps:update_with(
T, fun (A) ->
[{Q, Ctx#ctx.state} | A]
end, [{Q, Ctx#ctx.state}], Acc)
end, #{}, Qs),
%%% dispatch each group to queue type interface?
{Xs, Actions} = maps:fold(fun(Mod, QSs, {X0, A0}) ->
{X, A} = Mod:deliver(QSs, Delivery),
{X0 ++ X, A0 ++ A}
end, {[], []}, ByType),
State = lists:foldl(
fun({Q, S}, Acc) ->
Ctx = get_ctx_with(Q, Acc, S),
set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
end, State0, Xs),
return_ok(State, Actions).
case Qs of
[] ->
case Delivery#delivery.mandatory of
false -> rabbit_messages_counters:messages_unroutable_dropped(global, global, 1);
true -> rabbit_messages_counters:messages_unroutable_returned(global, global, 1)
end,
return_ok(State0, []);
_ ->
rabbit_messages_counters:messages_routed(global, global, 1),
%% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(
fun (Q, Acc) ->
T = amqqueue:get_type(Q),
Ctx = get_ctx(Q, State0),
maps:update_with(
T, fun (A) ->
[{Q, Ctx#ctx.state} | A]
end, [{Q, Ctx#ctx.state}], Acc)
end, #{}, Qs),
%%% dispatch each group to queue type interface?
{Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) ->
{X, A} = Mod:deliver(QTSs, Delivery),
rabbit_messages_counters:messages_published(global, global, length(Qs)),
{X0 ++ X, A0 ++ A}
end, {[], []}, ByType),
State = lists:foldl(
fun({Q, S}, Acc) ->
Ctx = get_ctx_with(Q, Acc, S),
set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
end, State0, Xs),
return_ok(State, Actions)
end.


-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(),
Expand Down Expand Up @@ -518,8 +539,13 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Mod = amqqueue:get_type(Q),
case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of
{ok, Num, Msg, State} ->
case NoAck of
false -> rabbit_messages_counters:messages_delivered_get_ack(global, global, 1);
true -> rabbit_messages_counters:messages_delivered_get_autoack(global, global, 1)
end,
{ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{empty, State} ->
rabbit_messages_counters:basic_get_empty(global, global, 1),
{empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{error, _} = Err ->
Err;
Expand Down
7 changes: 4 additions & 3 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ create_stream(Q0, Node) ->
{arguments, Arguments},
{user_who_performed_action,
ActingUser}]),
rabbit_messages_counters:new(queue, QName, []),
{new, Q};
Error ->

Expand Down Expand Up @@ -499,7 +500,7 @@ i(committed_offset, Q) ->
%% TODO should it be on a metrics table?
%% The queue could be removed between the list() and this call
%% to retrieve the overview. Let's default to '' if it's gone.
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
maps:get(committed_offset,
maps:get({osiris_writer, amqqueue:get_name(Q)}, Data, #{}), '');
i(policy, Q) ->
Expand Down Expand Up @@ -568,7 +569,7 @@ get_counters(Q) ->
lists:filter(fun (X) -> X =/= undefined end, Counters).

safe_get_overview(Node) ->
case rpc:call(Node, osiris_counters, overview, []) of
case rpc:call(Node, seshat_counters, overview, [osiris]) of
{badrpc, _} ->
#{node => Node};
Data ->
Expand Down Expand Up @@ -613,7 +614,7 @@ tracking_status(Vhost, QueueName) ->

readers(QName) ->
try
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
Readers = case maps:get({osiris_writer, QName}, Data, not_found) of
not_found ->
maps:get(readers, maps:get({osiris_replica, QName}, Data, #{}), 0);
Expand Down
Loading