Skip to content

Commit 12ce1ed

Browse files
Merge pull request #10085 from rabbitmq/mergify/bp/v3.12.x/pr-10072
Usability improvements during definition import #10068 (backport #10072)
2 parents 145bd9f + 8985107 commit 12ce1ed

File tree

7 files changed

+163
-27
lines changed

7 files changed

+163
-27
lines changed

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 115 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@
8080
'bindings' |
8181
'exchanges'.
8282

83-
-type definition_object() :: #{binary() => any()}.
83+
-type definition_key() :: binary() | atom().
84+
-type definition_object() :: #{definition_key() => any()}.
8485
-type definition_list() :: [definition_object()].
8586

8687
-type definitions() :: #{
@@ -104,18 +105,73 @@ maybe_load_definitions() ->
104105
{error, E} -> {error, E}
105106
end.
106107

107-
validate_definitions(Defs) when is_list(Defs) ->
108+
-spec validate_parsing_of_doc(any()) -> boolean().
109+
validate_parsing_of_doc(Body) when is_binary(Body) ->
110+
case decode(Body) of
111+
{ok, _Map} -> true;
112+
{error, _Err} -> false
113+
end.
114+
115+
-spec validate_parsing_of_doc_collection(list(any())) -> boolean().
116+
validate_parsing_of_doc_collection(Defs) when is_list(Defs) ->
117+
lists:foldl(fun(_Body, false) ->
118+
false;
119+
(Body, true) ->
120+
case decode(Body) of
121+
{ok, _Map} -> true;
122+
{error, _Err} -> false
123+
end
124+
end, true, Defs).
125+
126+
-spec filter_orphaned_objects(definition_list()) -> definition_list().
127+
filter_orphaned_objects(Maps) ->
128+
lists:filter(fun(M) -> maps:get(<<"vhost">>, M, undefined) =:= undefined end, Maps).
129+
130+
-spec any_orphaned_objects(definition_list()) -> boolean().
131+
any_orphaned_objects(Maps) ->
132+
length(filter_orphaned_objects(Maps)) > 0.
133+
134+
-spec any_orphaned_in_doc(definitions()) -> boolean().
135+
any_orphaned_in_doc(DefsMap) ->
136+
any_orphaned_in_category(DefsMap, <<"queues">>)
137+
orelse any_orphaned_in_category(DefsMap, <<"exchanges">>)
138+
orelse any_orphaned_in_category(DefsMap, <<"bindings">>).
139+
140+
-spec any_orphaned_in_category(definitions(), definition_category() | binary()) -> boolean().
141+
any_orphaned_in_category(DefsMap, Category) ->
142+
%% try both binary and atom keys
143+
any_orphaned_objects(maps:get(Category, DefsMap,
144+
maps:get(rabbit_data_coercion:to_atom(Category), DefsMap, []))).
145+
146+
-spec validate_orphaned_objects_in_doc_collection(list() | binary()) -> boolean().
147+
validate_orphaned_objects_in_doc_collection(Defs) when is_list(Defs) ->
108148
lists:foldl(fun(_Body, false) ->
109-
false;
110-
(Body, true) ->
111-
case decode(Body) of
112-
{ok, _Map} -> true;
113-
{error, _Err} -> false
114-
end
115-
end, true, Defs);
149+
false;
150+
(Body, true) ->
151+
validate_parsing_of_doc(Body)
152+
end, true, Defs).
153+
154+
-spec validate_orphaned_objects_in_doc(binary()) -> boolean().
155+
validate_orphaned_objects_in_doc(Body) when is_binary(Body) ->
156+
case decode(Body) of
157+
{ok, DefsMap} ->
158+
AnyOrphaned = any_orphaned_in_doc(DefsMap),
159+
case AnyOrphaned of
160+
true ->
161+
log_an_error_about_orphaned_objects();
162+
false -> ok
163+
end,
164+
AnyOrphaned;
165+
{error, _Err} -> false
166+
end.
167+
168+
-spec validate_definitions(list(any()) | binary()) -> boolean().
169+
validate_definitions(Defs) when is_list(Defs) ->
170+
validate_parsing_of_doc_collection(Defs) andalso
171+
validate_orphaned_objects_in_doc_collection(Defs);
116172
validate_definitions(Body) when is_binary(Body) ->
117173
case decode(Body) of
118-
{ok, _Map} -> true;
174+
{ok, Defs} -> validate_orphaned_objects_in_doc(Defs);
119175
{error, _Err} -> false
120176
end.
121177

@@ -284,6 +340,7 @@ maybe_load_definitions_from_local_filesystem(App, Key) ->
284340
undefined -> ok;
285341
{ok, none} -> ok;
286342
{ok, Path} ->
343+
rabbit_log:debug("~ts.~ts is set to '~ts', will discover definition file(s) to import", [App, Key, Path]),
287344
IsDir = filelib:is_dir(Path),
288345
Mod = rabbit_definitions_import_local_filesystem,
289346
rabbit_log:debug("Will use module ~ts to import definitions", [Mod]),
@@ -409,6 +466,10 @@ should_skip_if_unchanged() ->
409466
ReachedTargetClusterSize = rabbit_nodes:reached_target_cluster_size(),
410467
OptedIn andalso ReachedTargetClusterSize.
411468

469+
log_an_error_about_orphaned_objects() ->
470+
rabbit_log:error("Definitions import: some queues, exchanges or bindings in the definition file "
471+
"are missing the virtual host field. Such files are produced when definitions of "
472+
"a single virtual host are exported. They cannot be used to import definitions at boot time").
412473

413474
-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok' | {error, term()}.
414475

@@ -424,6 +485,20 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
424485
apply_defs(Map, ActingUser, fun () -> ok end, VHost);
425486
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
426487
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
488+
489+
%% If any of the queues or exchanges do not have virtual hosts set,
490+
%% this definition file was a virtual-host specific import. They cannot be applied
491+
%% as "complete" definition imports, most notably, imported on boot.
492+
AnyOrphaned = any_orphaned_in_doc(Map),
493+
494+
case AnyOrphaned of
495+
true ->
496+
log_an_error_about_orphaned_objects(),
497+
throw({error, invalid_definitions_file});
498+
false ->
499+
ok
500+
end,
501+
427502
try
428503
concurrent_for_all(users, ActingUser, Map,
429504
fun(User, _Username) ->
@@ -587,8 +662,11 @@ do_concurrent_for_all(List, WorkPoolFun) ->
587662
fun() ->
588663
_ = try
589664
WorkPoolFun(M)
590-
catch {error, E} -> gatherer:in(Gatherer, {error, E});
591-
_:E -> gatherer:in(Gatherer, {error, E})
665+
catch {error, E} -> gatherer:in(Gatherer, {error, E});
666+
_:E:Stacktrace ->
667+
rabbit_log:debug("Definition import: a work pool operation has thrown an exception ~st, stacktrace: ~p",
668+
[E, Stacktrace]),
669+
gatherer:in(Gatherer, {error, E})
592670
end,
593671
gatherer:finish(Gatherer)
594672
end)
@@ -731,6 +809,10 @@ add_queue_int(_Queue, R = #resource{kind = queue,
731809
Name = R#resource.name,
732810
rabbit_log:warning("Skipping import of a queue whose name begins with 'amq.', "
733811
"name: ~ts, acting user: ~ts", [Name, ActingUser]);
812+
add_queue_int(_Queue, R = #resource{kind = queue, virtual_host = undefined}, ActingUser) ->
813+
Name = R#resource.name,
814+
rabbit_log:warning("Skipping import of a queue with an unset virtual host field, "
815+
"name: ~ts, acting user: ~ts", [Name, ActingUser]);
734816
add_queue_int(Queue, Name = #resource{virtual_host = VHostName}, ActingUser) ->
735817
case rabbit_amqqueue:exists(Name) of
736818
true ->
@@ -824,6 +906,7 @@ validate_limits(All) ->
824906
undefined -> ok;
825907
Queues0 ->
826908
{ok, VHostMap} = filter_out_existing_queues(Queues0),
909+
_ = rabbit_log:debug("Definition import. Virtual host map for validation: ~p", [VHostMap]),
827910
maps:fold(fun validate_vhost_limit/3, ok, VHostMap)
828911
end.
829912

@@ -848,19 +931,30 @@ filter_out_existing_queues(VHost, Queues) ->
848931

849932
build_queue_data(Queue) ->
850933
VHost = maps:get(<<"vhost">>, Queue, undefined),
851-
Rec = rv(VHost, queue, <<"name">>, Queue),
852-
{Rec, VHost}.
934+
case VHost of
935+
undefined -> undefined;
936+
Value ->
937+
Rec = rv(Value, queue, <<"name">>, Queue),
938+
{Rec, VHost}
939+
end.
853940

854941
build_filtered_map([], AccMap) ->
855942
{ok, AccMap};
856943
build_filtered_map([Queue|Rest], AccMap0) ->
857-
{Rec, VHost} = build_queue_data(Queue),
858-
case rabbit_amqqueue:exists(Rec) of
859-
false ->
860-
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
861-
build_filtered_map(Rest, AccMap1);
862-
true ->
863-
build_filtered_map(Rest, AccMap0)
944+
%% If virtual host is not specified in a queue,
945+
%% this definition file is likely virtual host-specific.
946+
%%
947+
%% Skip such queues.
948+
case build_queue_data(Queue) of
949+
undefined -> build_filtered_map(Rest, AccMap0);
950+
{Rec, VHost} when VHost =/= undefined ->
951+
case rabbit_amqqueue:exists(Rec) of
952+
false ->
953+
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
954+
build_filtered_map(Rest, AccMap1);
955+
true ->
956+
build_filtered_map(Rest, AccMap0)
957+
end
864958
end.
865959

866960
validate_vhost_limit(VHost, AddCount, ok) ->

deps/rabbit/src/rabbit_definitions_import_local_filesystem.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ load_with_hashing(IsDir, Path, PreviousHash, Algo) when is_boolean(IsDir) ->
9494
Other
9595
end;
9696
false ->
97-
rabbit_log:error("Failed to parse a definition file, path: ~p", [Path]),
97+
rabbit_log:error("Definitions file at path ~p failed validation. The file must be a valid JSON document "
98+
"and all virtual host-scoped resources must have a virtual host field to be set. "
99+
"Definition files exported for a single virtual host CANNOT be imported at boot time", [Path]),
98100
{error, not_json}
99101
end
100102
end.

deps/rabbit/test/definition_import_SUITE.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ groups() ->
5353
import_case17,
5454
import_case18,
5555
import_case19,
56-
import_case20
56+
import_case20,
57+
import_case21
5758
]},
5859

5960
{boot_time_import_using_classic_source, [], [
@@ -305,6 +306,8 @@ import_case20(Config) ->
305306
{skip, "Should not run in mixed version environments"}
306307
end.
307308

309+
import_case21(Config) -> import_invalid_file_case(Config, "failing_case21").
310+
308311
export_import_round_trip_case1(Config) ->
309312
case rabbit_ct_helpers:is_mixed_versions() of
310313
false ->
@@ -386,7 +389,10 @@ import_file_case(Config, Subdirectory, CaseName) ->
386389

387390
import_invalid_file_case(Config, CaseName) ->
388391
CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"),
389-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath]),
392+
try
393+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath])
394+
catch _:_:_ -> ok
395+
end,
390396
ok.
391397

392398
import_invalid_file_case_if_unchanged(Config, CaseName) ->
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"rabbit_version": "3.12.8",
3+
"parameters": [],
4+
"policies": [],
5+
"queues": [
6+
{
7+
"name": "qq.1",
8+
"durable": true,
9+
"auto_delete": false,
10+
"arguments": {
11+
"x-queue-type": "quorum"
12+
}
13+
},
14+
{
15+
"name": "cq.1",
16+
"durable": true,
17+
"auto_delete": false,
18+
"arguments": {
19+
"x-queue-type": "classic"
20+
}
21+
},
22+
{
23+
"name": "sq.1",
24+
"durable": true,
25+
"auto_delete": false,
26+
"arguments": {
27+
"x-queue-type": "stream"
28+
}
29+
}
30+
],
31+
"exchanges": [],
32+
"bindings": []
33+
}

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/import_definitions_command.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
121121
false ->
122122
{:ok,
123123
"Successfully started definition import. " <>
124-
"This process is asynchronous and can take some time.\n"}
124+
"This process is asynchronous and can take some time. Watch target node logs for completion.\n"}
125125
end
126126
end
127127

deps/rabbitmq_management/src/rabbit_mgmt_load_definitions.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
%% Definition import functionality is now a core server feature.
1616

1717
boot() ->
18-
rabbit_log:debug("Will import definitions file from management.load_definitions"),
1918
rabbit_definitions:maybe_load_definitions(rabbitmq_management, load_definitions).
2019

2120
maybe_load_definitions() ->

deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,14 @@ accept_json(ReqData0, Context) ->
9595
end.
9696

9797
vhost_definitions(ReqData, VHost, Context) ->
98-
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available
98+
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available.
99+
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
99100
Xs = [strip_vhost(X) || X <- rabbit_mgmt_wm_exchanges:basic(ReqData),
100101
export_exchange(X)],
101102
VQs = [Q || Q <- rabbit_mgmt_wm_queues:basic(ReqData), export_queue(Q)],
102103
Qs = [strip_vhost(Q) || Q <- VQs],
103104
QNames = [{pget(name, Q), pget(vhost, Q)} || Q <- VQs],
105+
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
104106
Bs = [strip_vhost(B) || B <- rabbit_mgmt_wm_bindings:basic(ReqData),
105107
export_binding(B, QNames)],
106108
{ok, Vsn} = application:get_key(rabbit, vsn),

0 commit comments

Comments
 (0)