80
80
'bindings' |
81
81
'exchanges' .
82
82
83
- -type definition_object () :: #{binary () => any ()}.
83
+ -type definition_key () :: binary () | atom ().
84
+ -type definition_object () :: #{definition_key () => any ()}.
84
85
-type definition_list () :: [definition_object ()].
85
86
86
87
-type definitions () :: #{
@@ -104,18 +105,73 @@ maybe_load_definitions() ->
104
105
{error , E } -> {error , E }
105
106
end .
106
107
107
- validate_definitions (Defs ) when is_list (Defs ) ->
108
+ -spec validate_parsing_of_doc (any ()) -> boolean ().
109
+ validate_parsing_of_doc (Body ) when is_binary (Body ) ->
110
+ case decode (Body ) of
111
+ {ok , _Map } -> true ;
112
+ {error , _Err } -> false
113
+ end .
114
+
115
+ -spec validate_parsing_of_doc_collection (list (any ())) -> boolean ().
116
+ validate_parsing_of_doc_collection (Defs ) when is_list (Defs ) ->
117
+ lists :foldl (fun (_Body , false ) ->
118
+ false ;
119
+ (Body , true ) ->
120
+ case decode (Body ) of
121
+ {ok , _Map } -> true ;
122
+ {error , _Err } -> false
123
+ end
124
+ end , true , Defs ).
125
+
126
+ -spec filter_orphaned_objects (definition_list ()) -> definition_list ().
127
+ filter_orphaned_objects (Maps ) ->
128
+ lists :filter (fun (M ) -> maps :get (<<" vhost" >>, M , undefined ) =:= undefined end , Maps ).
129
+
130
+ -spec any_orphaned_objects (definition_list ()) -> boolean ().
131
+ any_orphaned_objects (Maps ) ->
132
+ length (filter_orphaned_objects (Maps )) > 0 .
133
+
134
+ -spec any_orphaned_in_doc (definitions ()) -> boolean ().
135
+ any_orphaned_in_doc (DefsMap ) ->
136
+ any_orphaned_in_category (DefsMap , <<" queues" >>)
137
+ orelse any_orphaned_in_category (DefsMap , <<" exchanges" >>)
138
+ orelse any_orphaned_in_category (DefsMap , <<" bindings" >>).
139
+
140
+ -spec any_orphaned_in_category (definitions (), definition_category () | binary ()) -> boolean ().
141
+ any_orphaned_in_category (DefsMap , Category ) ->
142
+ % % try both binary and atom keys
143
+ any_orphaned_objects (maps :get (Category , DefsMap ,
144
+ maps :get (rabbit_data_coercion :to_atom (Category ), DefsMap , []))).
145
+
146
+ -spec validate_orphaned_objects_in_doc_collection (list () | binary ()) -> boolean ().
147
+ validate_orphaned_objects_in_doc_collection (Defs ) when is_list (Defs ) ->
108
148
lists :foldl (fun (_Body , false ) ->
109
- false ;
110
- (Body , true ) ->
111
- case decode (Body ) of
112
- {ok , _Map } -> true ;
113
- {error , _Err } -> false
114
- end
115
- end , true , Defs );
149
+ false ;
150
+ (Body , true ) ->
151
+ validate_parsing_of_doc (Body )
152
+ end , true , Defs ).
153
+
154
+ -spec validate_orphaned_objects_in_doc (binary ()) -> boolean ().
155
+ validate_orphaned_objects_in_doc (Body ) when is_binary (Body ) ->
156
+ case decode (Body ) of
157
+ {ok , DefsMap } ->
158
+ AnyOrphaned = any_orphaned_in_doc (DefsMap ),
159
+ case AnyOrphaned of
160
+ true ->
161
+ log_an_error_about_orphaned_objects ();
162
+ false -> ok
163
+ end ,
164
+ AnyOrphaned ;
165
+ {error , _Err } -> false
166
+ end .
167
+
168
+ -spec validate_definitions (list (any ()) | binary ()) -> boolean ().
169
+ validate_definitions (Defs ) when is_list (Defs ) ->
170
+ validate_parsing_of_doc_collection (Defs ) andalso
171
+ validate_orphaned_objects_in_doc_collection (Defs );
116
172
validate_definitions (Body ) when is_binary (Body ) ->
117
173
case decode (Body ) of
118
- {ok , _Map } -> true ;
174
+ {ok , Defs } -> validate_orphaned_objects_in_doc ( Defs ) ;
119
175
{error , _Err } -> false
120
176
end .
121
177
@@ -284,6 +340,7 @@ maybe_load_definitions_from_local_filesystem(App, Key) ->
284
340
undefined -> ok ;
285
341
{ok , none } -> ok ;
286
342
{ok , Path } ->
343
+ rabbit_log :debug (" ~ts .~ts is set to '~ts ', will discover definition file(s) to import" , [App , Key , Path ]),
287
344
IsDir = filelib :is_dir (Path ),
288
345
Mod = rabbit_definitions_import_local_filesystem ,
289
346
rabbit_log :debug (" Will use module ~ts to import definitions" , [Mod ]),
@@ -409,6 +466,10 @@ should_skip_if_unchanged() ->
409
466
ReachedTargetClusterSize = rabbit_nodes :reached_target_cluster_size (),
410
467
OptedIn andalso ReachedTargetClusterSize .
411
468
469
+ log_an_error_about_orphaned_objects () ->
470
+ rabbit_log :error (" Definitions import: some queues, exchanges or bindings in the definition file "
471
+ " are missing the virtual host field. Such files are produced when definitions of "
472
+ " a single virtual host are exported. They cannot be used to import definitions at boot time" ).
412
473
413
474
-spec apply_defs (Map :: #{atom () => any ()}, ActingUser :: rabbit_types :username ()) -> 'ok' | {error , term ()}.
414
475
@@ -424,6 +485,20 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
424
485
apply_defs (Map , ActingUser , fun () -> ok end , VHost );
425
486
apply_defs (Map , ActingUser , SuccessFun ) when is_function (SuccessFun ) ->
426
487
Version = maps :get (rabbitmq_version , Map , maps :get (rabbit_version , Map , undefined )),
488
+
489
+ % % If any of the queues or exchanges do not have virtual hosts set,
490
+ % % this definition file was a virtual-host specific import. They cannot be applied
491
+ % % as "complete" definition imports, most notably, imported on boot.
492
+ AnyOrphaned = any_orphaned_in_doc (Map ),
493
+
494
+ case AnyOrphaned of
495
+ true ->
496
+ log_an_error_about_orphaned_objects (),
497
+ throw ({error , invalid_definitions_file });
498
+ false ->
499
+ ok
500
+ end ,
501
+
427
502
try
428
503
concurrent_for_all (users , ActingUser , Map ,
429
504
fun (User , _Username ) ->
@@ -587,8 +662,11 @@ do_concurrent_for_all(List, WorkPoolFun) ->
587
662
fun () ->
588
663
_ = try
589
664
WorkPoolFun (M )
590
- catch {error , E } -> gatherer :in (Gatherer , {error , E });
591
- _ :E -> gatherer :in (Gatherer , {error , E })
665
+ catch {error , E } -> gatherer :in (Gatherer , {error , E });
666
+ _ :E :Stacktrace ->
667
+ rabbit_log :debug (" Definition import: a work pool operation has thrown an exception ~s t, stacktrace: ~p " ,
668
+ [E , Stacktrace ]),
669
+ gatherer :in (Gatherer , {error , E })
592
670
end ,
593
671
gatherer :finish (Gatherer )
594
672
end )
@@ -735,6 +813,10 @@ add_queue_int(_Queue, R = #resource{kind = queue,
735
813
Name = R # resource .name ,
736
814
rabbit_log :warning (" Skipping import of a queue whose name begins with 'amq.', "
737
815
" name: ~ts , acting user: ~ts " , [Name , ActingUser ]);
816
+ add_queue_int (_Queue , R = # resource {kind = queue , virtual_host = undefined }, ActingUser ) ->
817
+ Name = R # resource .name ,
818
+ rabbit_log :warning (" Skipping import of a queue with an unset virtual host field, "
819
+ " name: ~ts , acting user: ~ts " , [Name , ActingUser ]);
738
820
add_queue_int (Queue , Name = # resource {virtual_host = VHostName }, ActingUser ) ->
739
821
case rabbit_amqqueue :exists (Name ) of
740
822
true ->
@@ -828,6 +910,7 @@ validate_limits(All) ->
828
910
undefined -> ok ;
829
911
Queues0 ->
830
912
{ok , VHostMap } = filter_out_existing_queues (Queues0 ),
913
+ _ = rabbit_log :debug (" Definition import. Virtual host map for validation: ~p " , [VHostMap ]),
831
914
maps :fold (fun validate_vhost_limit /3 , ok , VHostMap )
832
915
end .
833
916
@@ -852,19 +935,30 @@ filter_out_existing_queues(VHost, Queues) ->
852
935
853
936
build_queue_data (Queue ) ->
854
937
VHost = maps :get (<<" vhost" >>, Queue , undefined ),
855
- Rec = rv (VHost , queue , <<" name" >>, Queue ),
856
- {Rec , VHost }.
938
+ case VHost of
939
+ undefined -> undefined ;
940
+ Value ->
941
+ Rec = rv (Value , queue , <<" name" >>, Queue ),
942
+ {Rec , VHost }
943
+ end .
857
944
858
945
build_filtered_map ([], AccMap ) ->
859
946
{ok , AccMap };
860
947
build_filtered_map ([Queue |Rest ], AccMap0 ) ->
861
- {Rec , VHost } = build_queue_data (Queue ),
862
- case rabbit_amqqueue :exists (Rec ) of
863
- false ->
864
- AccMap1 = maps :update_with (VHost , fun (V ) -> V + 1 end , 1 , AccMap0 ),
865
- build_filtered_map (Rest , AccMap1 );
866
- true ->
867
- build_filtered_map (Rest , AccMap0 )
948
+ % % If virtual host is not specified in a queue,
949
+ % % this definition file is likely virtual host-specific.
950
+ % %
951
+ % % Skip such queues.
952
+ case build_queue_data (Queue ) of
953
+ undefined -> build_filtered_map (Rest , AccMap0 );
954
+ {Rec , VHost } when VHost =/= undefined ->
955
+ case rabbit_amqqueue :exists (Rec ) of
956
+ false ->
957
+ AccMap1 = maps :update_with (VHost , fun (V ) -> V + 1 end , 1 , AccMap0 ),
958
+ build_filtered_map (Rest , AccMap1 );
959
+ true ->
960
+ build_filtered_map (Rest , AccMap0 )
961
+ end
868
962
end .
869
963
870
964
validate_vhost_limit (VHost , AddCount , ok ) ->
0 commit comments