Skip to content

Commit ef45f34

Browse files
author
Loïc Hoguin
committed
Remove availability of CQv1
Most of the v1 code is still around as it is needed for conversion to v2. It will be removed at a later time when conversion is no longer supported. We don't shard the CQ property suite anymore: there's only 1 case remaining.
1 parent 9145b53 commit ef45f34

11 files changed

+114
-808
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,6 @@ rabbitmq_integration_suite(
322322
rabbitmq_integration_suite(
323323
name = "classic_queue_prop_SUITE",
324324
size = "large",
325-
shard_count = 6,
326-
sharding_method = "case",
327325
deps = [
328326
"@proper//:erlang_app",
329327
],

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2504,7 +2504,7 @@ end}.
25042504

25052505
{translation, "rabbit.classic_queue_default_version",
25062506
fun(Conf) ->
2507-
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
2507+
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
25082508
1 -> 1;
25092509
2 -> 2;
25102510
_ -> cuttlefish:unset()

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,
477477

478478
init_queue_version(Version0, State = #q {backing_queue = BQ,
479479
backing_queue_state = BQS}) ->
480-
%% When the version is undefined we use the default version 1.
481-
%% We want to BQ:set_queue_version in all cases because a v2
482-
%% policy might have been deleted, for example, and we want
483-
%% the queue to go back to v1.
484480
Version = case Version0 of
485-
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
481+
undefined -> 2;
486482
_ -> Version0
487483
end,
488484
BQS1 = BQ:set_queue_version(Version, BQS),

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms,
10781078
end,
10791079
State#qi{ confirms = sets:new([{version,2}]) }.
10801080

1081-
-spec needs_sync(state()) -> 'false'.
1081+
-spec needs_sync(state()) -> 'false' | 'confirms'.
10821082

10831083
needs_sync(State = #qi{ confirms = Confirms }) ->
10841084
?DEBUG("~0p", [State]),

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 7 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
313313
'undefined' | non_neg_integer(), qistate()}.
314314

315315
recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
316-
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) ->
316+
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
317+
%% We only allow using this module when converting to v2.
318+
convert) ->
317319
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
318320
put(segment_entry_count, SegmentEntryCount),
319321
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
@@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
323325
CleanShutdown = Terms /= non_clean_shutdown,
324326
case CleanShutdown andalso MsgStoreRecovered of
325327
true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of
326-
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context);
328+
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1);
327329
RecoveredCounts -> init_clean(RecoveredCounts, State1)
328330
end;
329-
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context)
331+
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
330332
end.
331333

332334
-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
@@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
644646
-define(RECOVER_BYTES, 2).
645647
-define(RECOVER_COUNTER_SIZE, 2).
646648

647-
init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
649+
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
648650
%% Recover the journal completely. This will also load segments
649651
%% which have entries in the journal and remove duplicates. The
650652
%% counts will correctly reflect the combination of the segment
@@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
679681
%% recovery fails with a crash.
680682
State2 = flush_journal(State1 #qistate { segments = Segments1,
681683
dirty_count = DirtyCount }),
682-
case Context of
683-
convert ->
684-
{Count, Bytes, State2};
685-
main ->
686-
%% We try to see if there are segment files from the v2 index.
687-
case rabbit_file:wildcard(".*\\.qi", Dir) of
688-
%% We are recovering a dirty queue that was using the v2 index or in
689-
%% the process of converting from v2 to v1.
690-
[_|_] ->
691-
#resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name,
692-
rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert",
693-
[QName, VHost, Count]),
694-
CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []),
695-
State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef),
696-
{Count + counters:get(CountersRef, ?RECOVER_COUNT),
697-
Bytes + counters:get(CountersRef, ?RECOVER_BYTES),
698-
State3};
699-
%% Otherwise keep default values.
700-
[] ->
701-
{Count, Bytes, State2}
702-
end
703-
end.
704-
705-
recover_index_v2_dirty(State0 = #qistate { queue_name = Name,
706-
on_sync = OnSyncFun,
707-
on_sync_msg = OnSyncMsgFun },
708-
ContainsCheckFun, CountersRef) ->
709-
#resource{virtual_host = VHost, name = QName} = Name,
710-
rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]),
711-
%% We cannot use the counts/bytes because some messages may be in both
712-
%% the v1 and v2 indexes after a crash.
713-
{_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true,
714-
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
715-
convert),
716-
State = recover_index_v2_common(State0, V2State, CountersRef),
717-
rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1",
718-
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
719-
State.
720-
721-
%% At this point all messages are persistent because transient messages
722-
%% were dropped during the v2 index recovery.
723-
recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir },
724-
V2State, CountersRef) ->
725-
%% Use a temporary per-queue store state to read embedded messages.
726-
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
727-
%% Go through the v2 index and publish messages to v1 index.
728-
{LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State),
729-
%% When resuming after a crash we need to double check the messages that are both
730-
%% in the v1 and v2 index (effectively the messages below the upper bound of the
731-
%% v1 index that are about to be written to it).
732-
{_, V1HiSeqId, _} = bounds(State0),
733-
SkipFun = fun
734-
(SeqId, FunState0) when SeqId < V1HiSeqId ->
735-
case read(SeqId, SeqId + 1, FunState0) of
736-
%% Message already exists, skip.
737-
{[_], FunState} ->
738-
{skip, FunState};
739-
%% Message doesn't exist, write.
740-
{[], FunState} ->
741-
{write, FunState}
742-
end;
743-
%% Message is out of bounds of the v1 index.
744-
(_, FunState) ->
745-
{write, FunState}
746-
end,
747-
%% We use a common function also used with conversion on policy change.
748-
{State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0,
749-
{CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES},
750-
LoSeqId, HiSeqId, SkipFun),
751-
%% Delete any remaining v2 index files.
752-
OldFiles = rabbit_file:wildcard(".*\\.qi", Dir)
753-
++ rabbit_file:wildcard(".*\\.qs", Dir),
754-
_ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles],
755-
%% Ensure that everything in the v1 index is written to disk.
756-
State = flush(State1),
757-
%% Clean up all the garbage that we have surely been creating.
758-
garbage_collect(),
759-
State.
684+
{Count, Bytes, State2}.
760685

761686
terminate(State = #qistate { journal_handle = JournalHdl,
762687
segments = Segments }) ->

0 commit comments

Comments
 (0)