Skip to content

Commit e832597

Browse files
michaelklishinmergify[bot]
authored andcommitted
Another take at #10068
Scan queues, exchanges and bindings before attempting to import anything on boot. If they miss the virtual host field, fail early and log a sensible message. (cherry picked from commit 62fffb6)
1 parent 145bd9f commit e832597

File tree

4 files changed

+82
-8
lines changed

4 files changed

+82
-8
lines changed

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,13 @@ should_skip_if_unchanged() ->
409409
ReachedTargetClusterSize = rabbit_nodes:reached_target_cluster_size(),
410410
OptedIn andalso ReachedTargetClusterSize.
411411

412+
-spec any_orphaned_objects(list(#{atom() => any()})) -> boolean().
413+
any_orphaned_objects(Maps) ->
414+
Filtered = lists:filter(fun(M) ->
415+
maps:get(<<"vhost">>, M, undefined) =:= undefined
416+
end, Maps),
417+
length(Filtered) > 0.
418+
412419

413420
-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok' | {error, term()}.
414421

@@ -424,6 +431,25 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
424431
apply_defs(Map, ActingUser, fun () -> ok end, VHost);
425432
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
426433
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
434+
435+
%% If any of the queues or exchanges do not have virtual hosts set,
436+
%% this definition file was a virtual-host specific import. They cannot be applied
437+
%% as "complete" definition imports, most notably, imported on boot.
438+
HasQueuesWithoutVirtualHostField = any_orphaned_objects(maps:get(queues, Map, [])),
439+
HasExchangesWithoutVirtualHostField = any_orphaned_objects(maps:get(exchanges, Map, [])),
440+
HasBindingsWithoutVirtualHostField = any_orphaned_objects(maps:get(bindings, Map, [])),
441+
442+
case (HasQueuesWithoutVirtualHostField and HasExchangesWithoutVirtualHostField and HasBindingsWithoutVirtualHostField) of
443+
true ->
444+
rabbit_log:error("Definitions import: some queues, exchanges or bindings in the definition file "
445+
"are missing the virtual host field. Such files are produced when definitions of "
446+
"a single virtual host are exported. They cannot be used to import definitions at boot time",
447+
[]),
448+
throw({error, invalid_definitions_file});
449+
false ->
450+
ok
451+
end,
452+
427453
try
428454
concurrent_for_all(users, ActingUser, Map,
429455
fun(User, _Username) ->
@@ -824,6 +850,7 @@ validate_limits(All) ->
824850
undefined -> ok;
825851
Queues0 ->
826852
{ok, VHostMap} = filter_out_existing_queues(Queues0),
853+
_ = rabbit_log:debug("Definition import. Virtual host map for validation: ~p", [VHostMap]),
827854
maps:fold(fun validate_vhost_limit/3, ok, VHostMap)
828855
end.
829856

@@ -855,12 +882,21 @@ build_filtered_map([], AccMap) ->
855882
{ok, AccMap};
856883
build_filtered_map([Queue|Rest], AccMap0) ->
857884
{Rec, VHost} = build_queue_data(Queue),
858-
case rabbit_amqqueue:exists(Rec) of
859-
false ->
860-
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
861-
build_filtered_map(Rest, AccMap1);
862-
true ->
863-
build_filtered_map(Rest, AccMap0)
885+
%% If virtual host is not specified in a queue,
886+
%% this definition file is likely virtual host-specific.
887+
%%
888+
%% Skip such queues.
889+
case VHost of
890+
undefined ->
891+
build_filtered_map(Rest, AccMap0);
892+
_Other ->
893+
case rabbit_amqqueue:exists(Rec) of
894+
false ->
895+
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
896+
build_filtered_map(Rest, AccMap1);
897+
true ->
898+
build_filtered_map(Rest, AccMap0)
899+
end
864900
end.
865901

866902
validate_vhost_limit(VHost, AddCount, ok) ->

deps/rabbit/test/definition_import_SUITE.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ groups() ->
5353
import_case17,
5454
import_case18,
5555
import_case19,
56-
import_case20
56+
import_case20,
57+
import_case21
5758
]},
5859

5960
{boot_time_import_using_classic_source, [], [
@@ -305,6 +306,8 @@ import_case20(Config) ->
305306
{skip, "Should not run in mixed version environments"}
306307
end.
307308

309+
import_case21(Config) -> import_invalid_file_case(Config, "failing_case21").
310+
308311
export_import_round_trip_case1(Config) ->
309312
case rabbit_ct_helpers:is_mixed_versions() of
310313
false ->
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"rabbit_version": "3.12.8",
3+
"parameters": [],
4+
"policies": [],
5+
"queues": [
6+
{
7+
"name": "qq.1",
8+
"durable": true,
9+
"auto_delete": false,
10+
"arguments": {
11+
"x-queue-type": "quorum"
12+
}
13+
},
14+
{
15+
"name": "cq.1",
16+
"durable": true,
17+
"auto_delete": false,
18+
"arguments": {
19+
"x-queue-type": "classic"
20+
}
21+
},
22+
{
23+
"name": "sq.1",
24+
"durable": true,
25+
"auto_delete": false,
26+
"arguments": {
27+
"x-queue-type": "stream"
28+
}
29+
}
30+
],
31+
"exchanges": [],
32+
"bindings": []
33+
}

deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,14 @@ accept_json(ReqData0, Context) ->
9595
end.
9696

9797
vhost_definitions(ReqData, VHost, Context) ->
98-
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available
98+
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available.
99+
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
99100
Xs = [strip_vhost(X) || X <- rabbit_mgmt_wm_exchanges:basic(ReqData),
100101
export_exchange(X)],
101102
VQs = [Q || Q <- rabbit_mgmt_wm_queues:basic(ReqData), export_queue(Q)],
102103
Qs = [strip_vhost(Q) || Q <- VQs],
103104
QNames = [{pget(name, Q), pget(vhost, Q)} || Q <- VQs],
105+
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
104106
Bs = [strip_vhost(B) || B <- rabbit_mgmt_wm_bindings:basic(ReqData),
105107
export_binding(B, QNames)],
106108
{ok, Vsn} = application:get_key(rabbit, vsn),

0 commit comments

Comments
 (0)