Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ should_skip_if_unchanged() ->
ReachedTargetClusterSize = rabbit_nodes:reached_target_cluster_size(),
OptedIn andalso ReachedTargetClusterSize.

-spec any_orphaned_objects(list(#{atom() => any()})) -> boolean().
any_orphaned_objects(Maps) ->
Filtered = lists:filter(fun(M) ->
maps:get(<<"vhost">>, M, undefined) =:= undefined
end, Maps),
length(Filtered) > 0.


-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok' | {error, term()}.

Expand All @@ -424,6 +431,24 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
apply_defs(Map, ActingUser, fun () -> ok end, VHost);
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),

%% If any of the queues or exchanges do not have virtual hosts set,
%% this definition file was a virtual-host specific import. They cannot be applied
%% as "complete" definition imports, most notably, imported on boot.
HasQueuesWithoutVirtualHostField = any_orphaned_objects(maps:get(queues, Map, [])),
HasExchangesWithoutVirtualHostField = any_orphaned_objects(maps:get(exchanges, Map, [])),
HasBindingsWithoutVirtualHostField = any_orphaned_objects(maps:get(bindings, Map, [])),

case (HasQueuesWithoutVirtualHostField and HasExchangesWithoutVirtualHostField and HasBindingsWithoutVirtualHostField) of
true ->
rabbit_log:error("Definitions import: some queues, exchanges or bindings in the definition file "
"are missing the virtual host field. Such files are produced when definitions of "
"a single virtual host are exported. They cannot be used to import definitions at boot time"),
throw({error, invalid_definitions_file});
false ->
ok
end,

try
concurrent_for_all(users, ActingUser, Map,
fun(User, _Username) ->
Expand Down Expand Up @@ -587,8 +612,8 @@ do_concurrent_for_all(List, WorkPoolFun) ->
fun() ->
_ = try
WorkPoolFun(M)
catch {error, E} -> gatherer:in(Gatherer, {error, E});
_:E -> gatherer:in(Gatherer, {error, E})
catch {error, E} -> gatherer:in(Gatherer, {error, E});
_:E:_Stacktrace -> gatherer:in(Gatherer, {error, E})
end,
gatherer:finish(Gatherer)
end)
Expand Down Expand Up @@ -735,6 +760,10 @@ add_queue_int(_Queue, R = #resource{kind = queue,
Name = R#resource.name,
rabbit_log:warning("Skipping import of a queue whose name begins with 'amq.', "
"name: ~ts, acting user: ~ts", [Name, ActingUser]);
add_queue_int(_Queue, R = #resource{kind = queue, virtual_host = undefined}, ActingUser) ->
Name = R#resource.name,
rabbit_log:warning("Skipping import of a queue with an unset virtual host field, "
"name: ~ts, acting user: ~ts", [Name, ActingUser]);
add_queue_int(Queue, Name = #resource{virtual_host = VHostName}, ActingUser) ->
case rabbit_amqqueue:exists(Name) of
true ->
Expand Down Expand Up @@ -828,6 +857,7 @@ validate_limits(All) ->
undefined -> ok;
Queues0 ->
{ok, VHostMap} = filter_out_existing_queues(Queues0),
_ = rabbit_log:debug("Definition import. Virtual host map for validation: ~p", [VHostMap]),
maps:fold(fun validate_vhost_limit/3, ok, VHostMap)
end.

Expand Down Expand Up @@ -859,12 +889,21 @@ build_filtered_map([], AccMap) ->
{ok, AccMap};
build_filtered_map([Queue|Rest], AccMap0) ->
{Rec, VHost} = build_queue_data(Queue),
case rabbit_amqqueue:exists(Rec) of
false ->
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
build_filtered_map(Rest, AccMap1);
true ->
build_filtered_map(Rest, AccMap0)
%% If virtual host is not specified in a queue,
%% this definition file is likely virtual host-specific.
%%
%% Skip such queues.
case VHost of
undefined ->
build_filtered_map(Rest, AccMap0);
_Other ->
case rabbit_amqqueue:exists(Rec) of
false ->
AccMap1 = maps:update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
build_filtered_map(Rest, AccMap1);
true ->
build_filtered_map(Rest, AccMap0)
end
end.

validate_vhost_limit(VHost, AddCount, ok) ->
Expand Down
10 changes: 8 additions & 2 deletions deps/rabbit/test/definition_import_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ groups() ->
import_case17,
import_case18,
import_case19,
import_case20
import_case20,
import_case21
]},

{boot_time_import_using_classic_source, [], [
Expand Down Expand Up @@ -305,6 +306,8 @@ import_case20(Config) ->
{skip, "Should not run in mixed version environments"}
end.

import_case21(Config) -> import_invalid_file_case(Config, "failing_case21").

export_import_round_trip_case1(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false ->
Expand Down Expand Up @@ -386,7 +389,10 @@ import_file_case(Config, Subdirectory, CaseName) ->

import_invalid_file_case(Config, CaseName) ->
CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath]),
try
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath])
catch _:_:_ -> ok
end,
ok.

import_invalid_file_case_in_khepri(Config, CaseName) ->
Expand Down
33 changes: 33 additions & 0 deletions deps/rabbit/test/definition_import_SUITE_data/failing_case21.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"rabbit_version": "3.12.8",
"parameters": [],
"policies": [],
"queues": [
{
"name": "qq.1",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "quorum"
}
},
{
"name": "cq.1",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
},
{
"name": "sq.1",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "stream"
}
}
],
"exchanges": [],
"bindings": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
false ->
{:ok,
"Successfully started definition import. " <>
"This process is asynchronous and can take some time.\n"}
"This process is asynchronous and can take some time. Watch target node logs for completion.\n"}
end
end

Expand Down
4 changes: 3 additions & 1 deletion deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ accept_json(ReqData0, Context) ->
end.

vhost_definitions(ReqData, VHost, Context) ->
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available
%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available.
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
Xs = [strip_vhost(X) || X <- rabbit_mgmt_wm_exchanges:basic(ReqData),
export_exchange(X)],
VQs = [Q || Q <- rabbit_mgmt_wm_queues:basic(ReqData), export_queue(Q)],
Qs = [strip_vhost(Q) || Q <- VQs],
QNames = [{pget(name, Q), pget(vhost, Q)} || Q <- VQs],
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
Bs = [strip_vhost(B) || B <- rabbit_mgmt_wm_bindings:basic(ReqData),
export_binding(B, QNames)],
{ok, Vsn} = application:get_key(rabbit, vsn),
Expand Down