Skip to content

Commit 9a02a9a

Browse files
committed
Replicate x_jms_topic_table Mnesia table (backport #11087)
The x_jms_topic_table Mnesia table must be on all nodes for messages to be published to JMS topic exchanges and routed to topic subscribers. The table used to be only in RAM on one node, so it would be unavailable when the node was down and empty when it came back up, losing the state for subscribers still online because connected to other nodes. Inspired by a similar change for the node maintenance status table in #9005.
1 parent 6763632 commit 9a02a9a

File tree

3 files changed

+63
-49
lines changed

3 files changed

+63
-49
lines changed

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -58,47 +58,14 @@ boot() ->
5858
rabbit_log:info(
5959
"Creating table ~s for maintenance mode status",
6060
[TableName]),
61-
try
62-
rabbit_table:create(
63-
TableName,
64-
status_table_definition()),
65-
%% The `rabbit_node_maintenance_states' table used to be global but not
66-
%% replicated. This leads to various errors during RabbitMQ boot or
67-
%% operations on the Mnesia database. The reason is the table existed
68-
%% on a single node and, if that node was stopped or MIA, other nodes
69-
%% may wait forever on that node for the table to be available.
70-
%%
71-
%% The call below makes sure this node has a copy of the table.
72-
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
73-
ok ->
74-
%% Next, we try to fix other nodes in the cluster if they are
75-
%% running a version of RabbitMQ which does not replicate the
76-
%% table. All nodes must have a replica for Mnesia operations
77-
%% to work properly. Therefore the code below is to make older
78-
%% compatible with newer nodes.
79-
Replicas = mnesia:table_info(TableName, all_nodes),
80-
Members = rabbit_nodes:list_running(),
81-
MissingOn = Members -- Replicas,
82-
lists:foreach(
83-
fun(Node) ->
84-
%% Errors from adding a replica on those older nodes
85-
%% are ignored however. They should not be fatal. The
86-
%% problem will solve by itself once all nodes are
87-
%% upgraded.
88-
_ = rpc:call(
89-
Node,
90-
rabbit_table, ensure_table_copy,
91-
[TableName, Node, ram_copies])
92-
end, MissingOn),
93-
ok;
94-
Error ->
95-
Error
96-
end
97-
catch throw:Reason ->
98-
rabbit_log:error(
99-
"Failed to create maintenance status table: ~p",
100-
[Reason])
101-
end.
61+
%% The `rabbit_node_maintenance_states' table used to be global but not
62+
%% replicated. This leads to various errors during RabbitMQ boot or
63+
%% operations on the Mnesia database. The reason is the table existed
64+
%% on a single node and, if that node was stopped or MIA, other nodes
65+
%% may wait forever on that node for the table to be available.
66+
rabbit_table:create_and_replicate_table(
67+
TableName,
68+
status_table_definition()).
10269

10370
%%
10471
%% API

deps/rabbit/src/rabbit_table.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-export([
1111
create/0, create/2, ensure_local_copies/1, ensure_table_copy/3,
12+
create_and_replicate_table/2,
1213
create_local_copy/2, wait_for_replicated/1, wait/1, wait/2,
1314
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
1415
check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
@@ -76,6 +77,7 @@ ensure_secondary_index(Table, Field) ->
7677

7778
%% mnesia:table() and mnesia:storage_type() are not exported
7879
-type mnesia_table() :: atom().
80+
-type mnesia_table_definition() :: list().
7981
-type mnesia_storage_type() :: 'ram_copies' | 'disc_copies' | 'disc_only_copies'.
8082

8183
-spec ensure_table_copy(mnesia_table(), node(), mnesia_storage_type()) ->
@@ -89,6 +91,44 @@ ensure_table_copy(TableName, Node, StorageType) ->
8991
{aborted, Reason} -> {error, Reason}
9092
end.
9193

94+
-spec create_and_replicate_table(mnesia_table(), mnesia_table_definition()) ->
95+
ok | {error, any()}.
96+
create_and_replicate_table(TableName, TableDefinition) ->
97+
try
98+
rabbit_table:create(TableName, TableDefinition),
99+
%% The call below makes sure this node has a copy of the table.
100+
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
101+
ok ->
102+
%% Next, we try to fix other nodes in the cluster if they are
103+
%% running a version of RabbitMQ which does not replicate the
104+
%% table. All nodes must have a replica for Mnesia operations
105+
%% to work properly. Therefore the code below is to make older
106+
%% compatible with newer nodes.
107+
Replicas = mnesia:table_info(TableName, all_nodes),
108+
Members = rabbit_nodes:list_running(),
109+
MissingOn = Members -- Replicas,
110+
lists:foreach(
111+
fun(Node) ->
112+
%% Errors from adding a replica on those older nodes
113+
%% are ignored however. They should not be fatal. The
114+
%% problem will solve by itself once all nodes are
115+
%% upgraded.
116+
_ = rpc:call(
117+
Node,
118+
rabbit_table, ensure_table_copy,
119+
[TableName, Node, ram_copies])
120+
end, MissingOn),
121+
ok;
122+
Error ->
123+
Error
124+
end
125+
catch throw:Reason ->
126+
rabbit_log:error(
127+
"Failed to create ~tp table: ~tp",
128+
[TableName, Reason])
129+
end.
130+
131+
92132
%% This arity only exists for backwards compatibility with certain
93133
%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
94134

deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,21 @@
5555

5656
% Initialise database table for all exchanges of type <<"x-jms-topic">>
5757
setup_db_schema() ->
58-
case mnesia:create_table( ?JMS_TOPIC_TABLE
59-
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
60-
, {record_name, ?JMS_TOPIC_RECORD}
61-
, {type, set} ]
62-
) of
63-
{atomic, ok} -> ok;
64-
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
65-
end.
58+
TableName = ?JMS_TOPIC_TABLE,
59+
TableDefinition = [{attributes, record_info(fields, ?JMS_TOPIC_RECORD)},
60+
{record_name, ?JMS_TOPIC_RECORD},
61+
{type, set}],
62+
rabbit_log:info(
63+
"Creating table ~ts for JMS topic exchange",
64+
[TableName]),
65+
%% The JMS topic exchange table must be available on all nodes.
66+
%% If it existed on only one node, messages could not be published
67+
%% to JMS topic exchanges and routed to topic subscribers if the node
68+
%% was unavailable.
69+
_ = rabbit_table:create_and_replicate_table(
70+
TableName,
71+
TableDefinition),
72+
ok.
6673

6774
%%----------------------------------------------------------------------------
6875
%% R E F E R E N C E T Y P E I N F O R M A T I O N

0 commit comments

Comments
 (0)