Skip to content

Commit df9fec8

Browse files
committed
Replicate x_jms_topic_table Mnesia table
The x_jms_topic_table Mnesia table must be on all nodes for messages to be published to JMS topic exchanges and routed to topic subscribers. The table used to be only in RAM on one node, so it would be unavailable when the node was down and empty when it came back up, losing the state for subscribers still online because connected to other nodes. References #9005
1 parent 57f9aec commit df9fec8

File tree

1 file changed

+45
-8
lines changed

1 file changed

+45
-8
lines changed

deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,51 @@ setup_schema() ->
3838
}).
3939

4040
setup_schema_in_mnesia() ->
41-
case mnesia:create_table( ?JMS_TOPIC_TABLE
42-
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
43-
, {record_name, ?JMS_TOPIC_RECORD}
44-
, {type, set} ]
45-
) of
46-
{atomic, ok} -> ok;
47-
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
48-
end,
41+
TableName = ?JMS_TOPIC_TABLE,
42+
rabbit_log:info(
43+
"Creating table ~ts for JMS topic exchange",
44+
[TableName]),
45+
_ = try
46+
rabbit_table:create(
47+
TableName,
48+
[{attributes, record_info(fields, ?JMS_TOPIC_RECORD)},
49+
{record_name, ?JMS_TOPIC_RECORD},
50+
{type, set}]),
51+
%% The JMS topic exchange table must be available on all nodes.
52+
%% If it existed on only one node, messages could not be published
53+
%% to JMS topic exchanges and routed to topic subscribers if the node
54+
%% was unavailable.
55+
%% The call below makes sure this node has a copy of the table.
56+
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
57+
ok ->
58+
%% Next, we try to fix other nodes in the cluster if they are
59+
%% running a version of RabbitMQ which does not replicate the
60+
%% table. All nodes must have a replica for Mnesia operations
61+
%% to work properly. Therefore the code below is to make older
62+
%% compatible with newer nodes.
63+
Replicas = mnesia:table_info(TableName, all_nodes),
64+
Members = rabbit_nodes:list_running(),
65+
MissingOn = Members -- Replicas,
66+
lists:foreach(
67+
fun(Node) ->
68+
%% Errors from adding a replica on those older nodes
69+
%% are ignored however. They should not be fatal. The
70+
%% problem will solve by itself once all nodes are
71+
%% upgraded.
72+
_ = rpc:call(
73+
Node,
74+
rabbit_table, ensure_table_copy,
75+
[TableName, Node, ram_copies])
76+
end, MissingOn),
77+
ok;
78+
Error ->
79+
Error
80+
end
81+
catch throw:Reason ->
82+
rabbit_log:error(
83+
"Failed to create JMS topic exchange table: ~tp",
84+
[Reason])
85+
end,
4986
ok.
5087

5188
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)