Skip to content

Commit e9eda70

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Replicate x_jms_topic_table Mnesia table
The x_jms_topic_table Mnesia table must be on all nodes for messages to be published to JMS topic exchanges and routed to topic subscribers. The table used to be only in RAM on one node, so it would be unavailable when the node was down and empty when it came back up, losing the state for subscribers still online because connected to other nodes. References #9005 (cherry picked from commit df9fec8) # Conflicts: # deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
1 parent 56cd419 commit e9eda70

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%% -----------------------------------------------------------------------------
7+
-module(rabbit_db_jms_exchange).
8+
9+
-include_lib("rabbit_common/include/rabbit.hrl").
10+
-include_lib("khepri/include/khepri.hrl").
11+
-include("rabbit_jms_topic_exchange.hrl").
12+
13+
-export([
14+
setup_schema/0,
15+
create_or_update/3,
16+
insert/2,
17+
get/1,
18+
delete/1,
19+
delete/3
20+
]).
21+
22+
-export([
23+
khepri_jms_topic_exchange_path/0,
24+
khepri_jms_topic_exchange_path/1
25+
]).
26+
27+
-rabbit_mnesia_tables_to_khepri_db(
28+
[{?JMS_TOPIC_TABLE, rabbit_db_jms_exchange_m2k_converter}]).
29+
30+
%% -------------------------------------------------------------------
31+
%% setup_schema()
32+
%% -------------------------------------------------------------------
33+
34+
setup_schema() ->
35+
rabbit_khepri:handle_fallback(
36+
#{mnesia => fun() -> setup_schema_in_mnesia() end,
37+
khepri => ok
38+
}).
39+
40+
setup_schema_in_mnesia() ->
41+
TableName = ?JMS_TOPIC_TABLE,
42+
rabbit_log:info(
43+
"Creating table ~ts for JMS topic exchange",
44+
[TableName]),
45+
_ = try
46+
rabbit_table:create(
47+
TableName,
48+
[{attributes, record_info(fields, ?JMS_TOPIC_RECORD)},
49+
{record_name, ?JMS_TOPIC_RECORD},
50+
{type, set}]),
51+
%% The JMS topic exchange table must be available on all nodes.
52+
%% If it existed on only one node, messages could not be published
53+
%% to JMS topic exchanges and routed to topic subscribers if the node
54+
%% was unavailable.
55+
%% The call below makes sure this node has a copy of the table.
56+
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
57+
ok ->
58+
%% Next, we try to fix other nodes in the cluster if they are
59+
%% running a version of RabbitMQ which does not replicate the
60+
%% table. All nodes must have a replica for Mnesia operations
61+
%% to work properly. Therefore the code below is to make older
62+
%% compatible with newer nodes.
63+
Replicas = mnesia:table_info(TableName, all_nodes),
64+
Members = rabbit_nodes:list_running(),
65+
MissingOn = Members -- Replicas,
66+
lists:foreach(
67+
fun(Node) ->
68+
%% Errors from adding a replica on those older nodes
69+
%% are ignored however. They should not be fatal. The
70+
%% problem will solve by itself once all nodes are
71+
%% upgraded.
72+
_ = rpc:call(
73+
Node,
74+
rabbit_table, ensure_table_copy,
75+
[TableName, Node, ram_copies])
76+
end, MissingOn),
77+
ok;
78+
Error ->
79+
Error
80+
end
81+
catch throw:Reason ->
82+
rabbit_log:error(
83+
"Failed to create JMS topic exchange table: ~tp",
84+
[Reason])
85+
end,
86+
ok.
87+
88+
%% -------------------------------------------------------------------
89+
%% create_or_update().
90+
%% -------------------------------------------------------------------
91+
92+
create_or_update(XName, BindingKeyAndFun, ErrorFun) ->
93+
rabbit_khepri:handle_fallback(
94+
#{mnesia =>
95+
fun() -> create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) end,
96+
khepri =>
97+
fun() -> update_in_khepri(XName, BindingKeyAndFun, fun put_item/2, ErrorFun) end
98+
}).
99+
100+
create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
101+
rabbit_mnesia:execute_mnesia_transaction(
102+
fun() ->
103+
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
104+
read_state_in_mnesia(XName, ErrorFun),
105+
write_state_fun_in_mnesia(XName, put_item(BindingFuns, BindingKeyAndFun))
106+
end).
107+
108+
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
109+
Path = khepri_jms_topic_exchange_path(XName),
110+
case rabbit_khepri:adv_get(Path) of
111+
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
112+
Path1 = khepri_path:combine_with_conditions(
113+
Path, [#if_payload_version{version = DVersion}]),
114+
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
115+
case Ret of
116+
ok -> ok;
117+
{error, {khepri, mismatching_node, _}} ->
118+
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun);
119+
{error, _} ->
120+
ErrorFun(XName)
121+
end;
122+
_Err ->
123+
ErrorFun(XName)
124+
end.
125+
126+
%% -------------------------------------------------------------------
127+
%% insert().
128+
%% -------------------------------------------------------------------
129+
130+
insert(XName, BFuns) ->
131+
rabbit_khepri:handle_fallback(
132+
#{mnesia => fun() -> insert_in_mnesia(XName, BFuns) end,
133+
khepri => fun() -> insert_in_khepri(XName, BFuns) end
134+
}).
135+
136+
insert_in_mnesia(XName, BFuns) ->
137+
rabbit_mnesia:execute_mnesia_transaction(
138+
fun() ->
139+
write_state_fun_in_mnesia(XName, BFuns)
140+
end).
141+
142+
insert_in_khepri(XName, BFuns) ->
143+
ok = rabbit_khepri:put(khepri_jms_topic_exchange_path(XName), BFuns).
144+
145+
%% -------------------------------------------------------------------
146+
%% get().
147+
%% -------------------------------------------------------------------
148+
149+
get(XName) ->
150+
rabbit_khepri:handle_fallback(
151+
#{mnesia => fun() -> get_in_mnesia(XName) end,
152+
khepri => fun() -> get_in_khepri(XName) end
153+
}).
154+
155+
get_in_mnesia(XName) ->
156+
mnesia:async_dirty(
157+
fun() ->
158+
case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of
159+
[#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns}] ->
160+
BindingFuns;
161+
_ ->
162+
not_found
163+
end
164+
end,
165+
[]
166+
).
167+
168+
get_in_khepri(XName) ->
169+
case rabbit_khepri:get(khepri_jms_topic_exchange_path(XName)) of
170+
{ok, BindingFuns} ->
171+
BindingFuns;
172+
_ ->
173+
not_found
174+
end.
175+
176+
%% -------------------------------------------------------------------
177+
%% delete().
178+
%% -------------------------------------------------------------------
179+
180+
delete(XName) ->
181+
rabbit_khepri:handle_fallback(
182+
#{mnesia => fun() -> delete_in_mnesia(XName) end,
183+
khepri => fun() -> delete_in_khepri(XName) end
184+
}).
185+
186+
delete_in_mnesia(XName) ->
187+
rabbit_mnesia:execute_mnesia_transaction(
188+
fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end).
189+
190+
delete_in_khepri(XName) ->
191+
rabbit_khepri:delete(khepri_jms_topic_exchange_path(XName)).
192+
193+
delete(XName, BindingKeys, ErrorFun) ->
194+
rabbit_khepri:handle_fallback(
195+
#{mnesia =>
196+
fun() -> delete_in_mnesia(XName, BindingKeys, ErrorFun) end,
197+
khepri =>
198+
fun() -> update_in_khepri(XName, BindingKeys, fun remove_items/2, ErrorFun) end
199+
}).
200+
201+
delete_in_mnesia(XName, BindingKeys, ErrorFun) ->
202+
rabbit_mnesia:execute_mnesia_transaction(
203+
fun() ->
204+
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
205+
read_state_in_mnesia(XName, ErrorFun),
206+
write_state_fun_in_mnesia(XName, remove_items(BindingFuns, BindingKeys))
207+
end).
208+
209+
read_state_in_mnesia(XName, ErrorFun) ->
210+
case mnesia:read(?JMS_TOPIC_TABLE, XName, write) of
211+
[Rec] -> Rec;
212+
_ -> ErrorFun(XName)
213+
end.
214+
215+
write_state_fun_in_mnesia(XName, BFuns) ->
216+
mnesia:write( ?JMS_TOPIC_TABLE
217+
, #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns}
218+
, write ).
219+
220+
%% -------------------------------------------------------------------
221+
%% dictionary handling
222+
%% -------------------------------------------------------------------
223+
224+
% add an item to the dictionary of binding functions
225+
put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict).
226+
227+
% remove a list of keyed items from the dictionary, by key
228+
remove_items(Dict, []) -> Dict;
229+
remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
230+
231+
%% -------------------------------------------------------------------
232+
%% paths
233+
%% -------------------------------------------------------------------
234+
235+
khepri_jms_topic_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
236+
[?MODULE, jms_topic_exchange, VHost, Name].
237+
238+
khepri_jms_topic_exchange_path() ->
239+
[?MODULE, jms_topic_exchange].

0 commit comments

Comments
 (0)