80
80
'bindings' |
81
81
'exchanges' .
82
82
83
- -type definition_object () :: #{binary () => any ()}.
83
+ -type definition_key () :: binary () | atom ().
84
+ -type definition_object () :: #{definition_key () => any ()}.
84
85
-type definition_list () :: [definition_object ()].
85
86
86
87
-type definitions () :: #{
@@ -104,18 +105,73 @@ maybe_load_definitions() ->
104
105
{error , E } -> {error , E }
105
106
end .
106
107
107
- validate_definitions (Defs ) when is_list (Defs ) ->
108
+ -spec validate_parsing_of_doc (any ()) -> boolean ().
109
+ validate_parsing_of_doc (Body ) when is_binary (Body ) ->
110
+ case decode (Body ) of
111
+ {ok , _Map } -> true ;
112
+ {error , _Err } -> false
113
+ end .
114
+
115
+ -spec validate_parsing_of_doc_collection (list (any ())) -> boolean ().
116
+ validate_parsing_of_doc_collection (Defs ) when is_list (Defs ) ->
108
117
lists :foldl (fun (_Body , false ) ->
109
- false ;
110
- (Body , true ) ->
111
- case decode (Body ) of
112
- {ok , _Map } -> true ;
113
- {error , _Err } -> false
114
- end
115
- end , true , Defs );
118
+ false ;
119
+ (Body , true ) ->
120
+ case decode (Body ) of
121
+ {ok , _Map } -> true ;
122
+ {error , _Err } -> false
123
+ end
124
+ end , true , Defs ).
125
+
126
+ -spec filter_orphaned_objects (definition_list ()) -> definition_list ().
127
+ filter_orphaned_objects (Maps ) ->
128
+ lists :filter (fun (M ) -> maps :get (<<" vhost" >>, M , undefined ) =:= undefined end , Maps ).
129
+
130
+ -spec any_orphaned_objects (definition_list ()) -> boolean ().
131
+ any_orphaned_objects (Maps ) ->
132
+ length (filter_orphaned_objects (Maps )) > 0 .
133
+
134
+ -spec any_orphaned_in_definitions (definitions ()) -> boolean ().
135
+ any_orphaned_in_definitions (DefsMap ) ->
136
+ any_orphaned_in_category (DefsMap , <<" queues" >>)
137
+ orelse any_orphaned_in_category (DefsMap , <<" exchanges" >>)
138
+ orelse any_orphaned_in_category (DefsMap , <<" bindings" >>).
139
+
140
+ -spec any_orphaned_in_category (definitions (), definition_category () | binary ()) -> boolean ().
141
+ any_orphaned_in_category (DefsMap , Category ) ->
142
+ % % try both binary and atom keys
143
+ any_orphaned_objects (maps :get (Category , DefsMap ,
144
+ maps :get (rabbit_data_coercion :to_atom (Category ), DefsMap , []))).
145
+
146
+ -spec validate_orphaned_objects_in_doc_collection (list () | binary ()) -> boolean ().
147
+ validate_orphaned_objects_in_doc_collection (Defs ) when is_list (Defs ) ->
148
+ lists :foldl (fun (_Body , false ) ->
149
+ false ;
150
+ (Body , true ) ->
151
+ validate_parsing_of_doc (Body )
152
+ end , true , Defs ).
153
+
154
+ -spec validate_orphaned_objects_in_doc (binary ()) -> boolean ().
155
+ validate_orphaned_objects_in_doc (Body ) when is_binary (Body ) ->
156
+ case decode (Body ) of
157
+ {ok , DefsMap } ->
158
+ AnyOrphaned = any_orphaned_in_definitions (DefsMap ),
159
+ case AnyOrphaned of
160
+ true ->
161
+ log_an_error_about_orphaned_objects ();
162
+ false -> ok
163
+ end ,
164
+ AnyOrphaned ;
165
+ {error , _Err } -> false
166
+ end .
167
+
168
+ -spec validate_definitions (list (any ()) | binary ()) -> boolean ().
169
+ validate_definitions (Defs ) when is_list (Defs ) ->
170
+ validate_parsing_of_doc_collection (Defs ) andalso
171
+ validate_orphaned_objects_in_doc_collection (Defs );
116
172
validate_definitions (Body ) when is_binary (Body ) ->
117
173
case decode (Body ) of
118
- {ok , _Map } -> true ;
174
+ {ok , Defs } -> validate_orphaned_objects_in_doc ( Defs ) ;
119
175
{error , _Err } -> false
120
176
end .
121
177
@@ -409,13 +465,10 @@ should_skip_if_unchanged() ->
409
465
ReachedTargetClusterSize = rabbit_nodes :reached_target_cluster_size (),
410
466
OptedIn andalso ReachedTargetClusterSize .
411
467
412
- -spec any_orphaned_objects (list (#{atom () => any ()})) -> boolean ().
413
- any_orphaned_objects (Maps ) ->
414
- Filtered = lists :filter (fun (M ) ->
415
- maps :get (<<" vhost" >>, M , undefined ) =:= undefined
416
- end , Maps ),
417
- length (Filtered ) > 0 .
418
-
468
+ log_an_error_about_orphaned_objects () ->
469
+ rabbit_log :error (" Definitions import: some queues, exchanges or bindings in the definition file "
470
+ " are missing the virtual host field. Such files are produced when definitions of "
471
+ " a single virtual host are exported. They cannot be used to import definitions at boot time" ).
419
472
420
473
-spec apply_defs (Map :: #{atom () => any ()}, ActingUser :: rabbit_types :username ()) -> 'ok' | {error , term ()}.
421
474
@@ -435,15 +488,11 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
435
488
% % If any of the queues or exchanges do not have virtual hosts set,
436
489
% % this definition file was a virtual-host specific import. They cannot be applied
437
490
% % as "complete" definition imports, most notably, imported on boot.
438
- HasQueuesWithoutVirtualHostField = any_orphaned_objects (maps :get (queues , Map , [])),
439
- HasExchangesWithoutVirtualHostField = any_orphaned_objects (maps :get (exchanges , Map , [])),
440
- HasBindingsWithoutVirtualHostField = any_orphaned_objects (maps :get (bindings , Map , [])),
491
+ AnyOrphaned = any_orphaned_in_definitions (Map ),
441
492
442
- case ( HasQueuesWithoutVirtualHostField orelse HasExchangesWithoutVirtualHostField orelse HasBindingsWithoutVirtualHostField ) of
493
+ case AnyOrphaned of
443
494
true ->
444
- rabbit_log :error (" Definitions import: some queues, exchanges or bindings in the definition file "
445
- " are missing the virtual host field. Such files are produced when definitions of "
446
- " a single virtual host are exported. They cannot be used to import definitions at boot time" ),
495
+ log_an_error_about_orphaned_objects (),
447
496
throw ({error , invalid_definitions_file });
448
497
false ->
449
498
ok
@@ -612,8 +661,11 @@ do_concurrent_for_all(List, WorkPoolFun) ->
612
661
fun () ->
613
662
_ = try
614
663
WorkPoolFun (M )
615
- catch {error , E } -> gatherer :in (Gatherer , {error , E });
616
- _ :E :_Stacktrace -> gatherer :in (Gatherer , {error , E })
664
+ catch {error , E } -> gatherer :in (Gatherer , {error , E });
665
+ _ :E :Stacktrace ->
666
+ rabbit_log :debug (" Definition import: a work pool operation has thrown an exception ~s t, stacktrace: ~p " ,
667
+ [E , Stacktrace ]),
668
+ gatherer :in (Gatherer , {error , E })
617
669
end ,
618
670
gatherer :finish (Gatherer )
619
671
end )
@@ -882,21 +934,23 @@ filter_out_existing_queues(VHost, Queues) ->
882
934
883
935
build_queue_data (Queue ) ->
884
936
VHost = maps :get (<<" vhost" >>, Queue , undefined ),
885
- Rec = rv (VHost , queue , <<" name" >>, Queue ),
886
- {Rec , VHost }.
937
+ case VHost of
938
+ undefined -> undefined ;
939
+ Value ->
940
+ Rec = rv (Value , queue , <<" name" >>, Queue ),
941
+ {Rec , VHost }
942
+ end .
887
943
888
944
build_filtered_map ([], AccMap ) ->
889
945
{ok , AccMap };
890
946
build_filtered_map ([Queue |Rest ], AccMap0 ) ->
891
- {Rec , VHost } = build_queue_data (Queue ),
892
947
% % If virtual host is not specified in a queue,
893
948
% % this definition file is likely virtual host-specific.
894
949
% %
895
950
% % Skip such queues.
896
- case VHost of
897
- undefined ->
898
- build_filtered_map (Rest , AccMap0 );
899
- _Other ->
951
+ case build_queue_data (Queue ) of
952
+ undefined -> build_filtered_map (Rest , AccMap0 );
953
+ {Rec , VHost } when VHost =/= undefined ->
900
954
case rabbit_amqqueue :exists (Rec ) of
901
955
false ->
902
956
AccMap1 = maps :update_with (VHost , fun (V ) -> V + 1 end , 1 , AccMap0 ),
0 commit comments