@@ -485,12 +485,12 @@ apply(#{index := Idx}, #garbage_collection{}, State) ->
485
485
update_smallest_raft_index (Idx , ok , State , [{aux , garbage_collection }]);
486
486
apply (Meta , {timeout , expire_msgs }, State ) ->
487
487
checkout (Meta , State , State , []);
488
- apply (#{system_time := Ts , machine_version : = MachineVersion } = Meta ,
488
+ apply (#{system_time := Ts } = Meta ,
489
489
{down , Pid , noconnection },
490
490
#? STATE {consumers = Cons0 ,
491
- cfg = # cfg {consumer_strategy = single_active },
492
- waiting_consumers = Waiting0 ,
493
- enqueuers = Enqs0 } = State0 ) ->
491
+ cfg = # cfg {consumer_strategy = single_active },
492
+ waiting_consumers = Waiting0 ,
493
+ enqueuers = Enqs0 } = State0 ) ->
494
494
Node = node (Pid ),
495
495
% % if the pid refers to an active or cancelled consumer,
496
496
% % mark it as suspected and return it to the waiting queue
@@ -501,14 +501,15 @@ apply(#{system_time := Ts, machine_version := MachineVersion} = Meta,
501
501
% % and checked out messages should be returned
502
502
Effs = consumer_update_active_effects (
503
503
S0 , Cid , C0 , false , suspected_down , E0 ),
504
- C1 = case MachineVersion of
505
- V when V >= 3 ->
506
- C0 ;
507
- 2 ->
508
- Checked = C0 # consumer .checked_out ,
509
- Credit = increase_credit (Meta , C0 , maps :size (Checked )),
510
- C0 # consumer {credit = Credit }
511
- end ,
504
+ C1 = C0 ,
505
+ % case MachineVersion of
506
+ % V when V >= 3 ->
507
+ % C0;
508
+ % 2 ->
509
+ % Checked = C0#consumer.checked_out,
510
+ % Credit = increase_credit(Meta, C0, maps:size(Checked)),
511
+ % C0#consumer{credit = Credit}
512
+ % end,
512
513
{St , Effs1 } = return_all (Meta , S0 , Effs , Cid , C1 ),
513
514
% % if the consumer was cancelled there is a chance it got
514
515
% % removed when returning hence we need to be defensive here
@@ -539,7 +540,7 @@ apply(#{system_time := Ts, machine_version := MachineVersion} = Meta,
539
540
end , Enqs0 ),
540
541
Effects = [{monitor , node , Node } | Effects1 ],
541
542
checkout (Meta , State0 , State #? STATE {enqueuers = Enqs }, Effects );
542
- apply (#{system_time := Ts , machine_version : = MachineVersion } = Meta ,
543
+ apply (#{system_time := Ts } = Meta ,
543
544
{down , Pid , noconnection },
544
545
#? STATE {consumers = Cons0 ,
545
546
enqueuers = Enqs0 } = State0 ) ->
@@ -555,17 +556,9 @@ apply(#{system_time := Ts, machine_version := MachineVersion} = Meta,
555
556
556
557
{State , Effects1 } =
557
558
maps :fold (
558
- fun ({_ , P } = Cid , # consumer {checked_out = Checked0 ,
559
- status = up } = C0 ,
559
+ fun ({_ , P } = Cid , # consumer {status = up } = C0 ,
560
560
{St0 , Eff }) when node (P ) =:= Node ->
561
- C = case MachineVersion of
562
- V when V >= 3 ->
563
- C0 # consumer {status = suspected_down };
564
- 2 ->
565
- Credit = increase_credit (Meta , C0 , map_size (Checked0 )),
566
- C0 # consumer {status = suspected_down ,
567
- credit = Credit }
568
- end ,
561
+ C = C0 # consumer {status = suspected_down },
569
562
{St , Eff0 } = return_all (Meta , St0 , Eff , Cid , C ),
570
563
Eff1 = consumer_update_active_effects (St , Cid , C , false ,
571
564
suspected_down , Eff0 ),
@@ -653,161 +646,10 @@ apply(_Meta, Cmd, State) ->
653
646
rabbit_log :debug (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
654
647
{State , ok , []}.
655
648
656
- convert_msg ({RaftIdx , {Header , empty }}) when is_integer (RaftIdx ) ->
657
- ? MSG (RaftIdx , Header );
658
- convert_msg ({RaftIdx , {Header , _Msg }}) when is_integer (RaftIdx ) ->
659
- ? MSG (RaftIdx , Header );
660
- convert_msg ({'$empty_msg' , Header }) ->
661
- % % dummy index
662
- ? MSG (undefined , Header );
663
- convert_msg ({'$prefix_msg' , Header }) ->
664
- % % dummy index
665
- ? MSG (undefined , Header );
666
- convert_msg ({Header , empty }) ->
667
- convert_msg (Header );
668
- convert_msg (Header ) when ? IS_HEADER (Header ) ->
669
- ? MSG (undefined , Header ).
670
-
671
- convert_consumer_v1_to_v2 ({ConsumerTag , Pid }, CV1 ) ->
672
- Meta = element (2 , CV1 ),
673
- CheckedOut = element (3 , CV1 ),
674
- NextMsgId = element (4 , CV1 ),
675
- Credit = element (5 , CV1 ),
676
- DeliveryCount = element (6 , CV1 ),
677
- CreditMode = element (7 , CV1 ),
678
- LifeTime = element (8 , CV1 ),
679
- Status = element (9 , CV1 ),
680
- Priority = element (10 , CV1 ),
681
- # consumer {cfg = # consumer_cfg {tag = ConsumerTag ,
682
- pid = Pid ,
683
- meta = Meta ,
684
- credit_mode = CreditMode ,
685
- lifetime = LifeTime ,
686
- priority = Priority },
687
- credit = Credit ,
688
- status = Status ,
689
- delivery_count = DeliveryCount ,
690
- next_msg_id = NextMsgId ,
691
- checked_out = maps :map (
692
- fun (_ , {Tag , _ } = Msg ) when is_atom (Tag ) ->
693
- convert_msg (Msg );
694
- (_ , {_Seq , Msg }) ->
695
- convert_msg (Msg )
696
- end , CheckedOut )
697
- }.
698
-
699
- convert_v1_to_v2 (V1State0 ) ->
700
- V1State = rabbit_fifo_v1 :enqueue_all_pending (V1State0 ),
701
- IndexesV1 = rabbit_fifo_v1 :get_field (ra_indexes , V1State ),
702
- ReturnsV1 = rabbit_fifo_v1 :get_field (returns , V1State ),
703
- MessagesV1 = rabbit_fifo_v1 :get_field (messages , V1State ),
704
- ConsumersV1 = rabbit_fifo_v1 :get_field (consumers , V1State ),
705
- WaitingConsumersV1 = rabbit_fifo_v1 :get_field (waiting_consumers , V1State ),
706
- % % remove all raft idx in messages from index
707
- {_ , PrefReturns , _ , PrefMsgs } = rabbit_fifo_v1 :get_field (prefix_msgs , V1State ),
708
- V2PrefMsgs = lists :foldl (fun (Hdr , Acc ) ->
709
- lqueue :in (convert_msg (Hdr ), Acc )
710
- end , lqueue :new (), PrefMsgs ),
711
- V2PrefReturns = lists :foldl (fun (Hdr , Acc ) ->
712
- lqueue :in (convert_msg (Hdr ), Acc )
713
- end , lqueue :new (), PrefReturns ),
714
- MessagesV2 = lqueue :fold (fun ({_ , Msg }, Acc ) ->
715
- lqueue :in (convert_msg (Msg ), Acc )
716
- end , V2PrefMsgs , MessagesV1 ),
717
- ReturnsV2 = lqueue :fold (fun ({_SeqId , Msg }, Acc ) ->
718
- lqueue :in (convert_msg (Msg ), Acc )
719
- end , V2PrefReturns , ReturnsV1 ),
720
- ConsumersV2 = maps :map (
721
- fun (ConsumerId , CV1 ) ->
722
- convert_consumer_v1_to_v2 (ConsumerId , CV1 )
723
- end , ConsumersV1 ),
724
- WaitingConsumersV2 = lists :map (
725
- fun ({ConsumerId , CV1 }) ->
726
- {ConsumerId , convert_consumer_v1_to_v2 (ConsumerId , CV1 )}
727
- end , WaitingConsumersV1 ),
728
- EnqueuersV1 = rabbit_fifo_v1 :get_field (enqueuers , V1State ),
729
- EnqueuersV2 = maps :map (fun (_EnqPid , Enq ) ->
730
- Enq # enqueuer {unused = undefined }
731
- end , EnqueuersV1 ),
732
-
733
- % % do after state conversion
734
- % % The (old) format of dead_letter_handler in RMQ < v3.10 is:
735
- % % {Module, Function, Args}
736
- % % The (new) format of dead_letter_handler in RMQ >= v3.10 is:
737
- % % undefined | {at_most_once, {Module, Function, Args}} | at_least_once
738
- % %
739
- % % Note that the conversion must convert both from old format to new format
740
- % % as well as from new format to new format. The latter is because quorum queues
741
- % % created in RMQ >= v3.10 are still initialised with rabbit_fifo_v0 as described in
742
- % % https://github.com/rabbitmq/ra/blob/e0d1e6315a45f5d3c19875d66f9d7bfaf83a46e3/src/ra_machine.erl#L258-L265
743
- DLH = case rabbit_fifo_v1 :get_cfg_field (dead_letter_handler , V1State ) of
744
- {_M , _F , _A = [_DLX = undefined |_ ]} ->
745
- % % queue was declared in RMQ < v3.10 and no DLX configured
746
- undefined ;
747
- {_M , _F , _A } = MFA ->
748
- % % queue was declared in RMQ < v3.10 and DLX configured
749
- {at_most_once , MFA };
750
- Other ->
751
- Other
752
- end ,
753
-
754
- Cfg = # cfg {name = rabbit_fifo_v1 :get_cfg_field (name , V1State ),
755
- resource = rabbit_fifo_v1 :get_cfg_field (resource , V1State ),
756
- release_cursor_interval = rabbit_fifo_v1 :get_cfg_field (release_cursor_interval , V1State ),
757
- dead_letter_handler = DLH ,
758
- become_leader_handler = rabbit_fifo_v1 :get_cfg_field (become_leader_handler , V1State ),
759
- % % TODO: what if policy enabling reject_publish was applied before conversion?
760
- overflow_strategy = rabbit_fifo_v1 :get_cfg_field (overflow_strategy , V1State ),
761
- max_length = rabbit_fifo_v1 :get_cfg_field (max_length , V1State ),
762
- max_bytes = rabbit_fifo_v1 :get_cfg_field (max_bytes , V1State ),
763
- consumer_strategy = rabbit_fifo_v1 :get_cfg_field (consumer_strategy , V1State ),
764
- delivery_limit = rabbit_fifo_v1 :get_cfg_field (delivery_limit , V1State ),
765
- expires = rabbit_fifo_v1 :get_cfg_field (expires , V1State )
766
- },
767
-
768
- MessagesConsumersV2 = maps :fold (fun (_ConsumerId , # consumer {checked_out = Checked }, Acc ) ->
769
- Acc + maps :size (Checked )
770
- end , 0 , ConsumersV2 ),
771
- MessagesWaitingConsumersV2 = lists :foldl (fun ({_ConsumerId , # consumer {checked_out = Checked }}, Acc ) ->
772
- Acc + maps :size (Checked )
773
- end , 0 , WaitingConsumersV2 ),
774
- MessagesTotal = lqueue :len (MessagesV2 ) +
775
- lqueue :len (ReturnsV2 ) +
776
- MessagesConsumersV2 +
777
- MessagesWaitingConsumersV2 ,
778
-
779
- #? STATE {cfg = Cfg ,
780
- messages = MessagesV2 ,
781
- messages_total = MessagesTotal ,
782
- returns = ReturnsV2 ,
783
- enqueue_count = rabbit_fifo_v1 :get_field (enqueue_count , V1State ),
784
- enqueuers = EnqueuersV2 ,
785
- ra_indexes = IndexesV1 ,
786
- release_cursors = rabbit_fifo_v1 :get_field (release_cursors , V1State ),
787
- consumers = ConsumersV2 ,
788
- service_queue = rabbit_fifo_v1 :get_field (service_queue , V1State ),
789
- msg_bytes_enqueue = rabbit_fifo_v1 :get_field (msg_bytes_enqueue , V1State ),
790
- msg_bytes_checkout = rabbit_fifo_v1 :get_field (msg_bytes_checkout , V1State ),
791
- waiting_consumers = WaitingConsumersV2 ,
792
- last_active = rabbit_fifo_v1 :get_field (last_active , V1State )
793
- }.
794
-
795
- convert_v2_to_v3 (# rabbit_fifo {consumers = ConsumersV2 } = StateV2 ) ->
796
- ConsumersV3 = maps :map (fun (_ , C ) ->
797
- convert_consumer_v2_to_v3 (C )
798
- end , ConsumersV2 ),
799
- StateV2 # rabbit_fifo {consumers = ConsumersV3 }.
800
-
801
649
convert_v3_to_v4 (# rabbit_fifo {} = StateV3 ) ->
802
650
% % nothing to convert - yet
803
651
StateV3 .
804
652
805
- convert_consumer_v2_to_v3 (C = # consumer {cfg = Cfg = # consumer_cfg {credit_mode = simple_prefetch ,
806
- meta = #{prefetch := Prefetch }}}) ->
807
- C # consumer {cfg = Cfg # consumer_cfg {credit_mode = {simple_prefetch , Prefetch }}};
808
- convert_consumer_v2_to_v3 (C ) ->
809
- C .
810
-
811
653
purge_node (Meta , Node , State , Effects ) ->
812
654
lists :foldl (fun (Pid , {S0 , E0 }) ->
813
655
{S , E } = handle_down (Meta , Pid , S0 ),
@@ -1702,25 +1544,23 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Effects0,
1702
1544
{duplicate , State0 , Effects0 }
1703
1545
end .
1704
1546
1705
- return (#{index := IncomingRaftIdx , machine_version : = MachineVersion } = Meta ,
1547
+ return (#{index := IncomingRaftIdx } = Meta ,
1706
1548
ConsumerId , Returned , Effects0 , State0 ) ->
1707
1549
{State1 , Effects1 } = maps :fold (
1708
1550
fun (MsgId , Msg , {S0 , E0 }) ->
1709
1551
return_one (Meta , MsgId , Msg , S0 , E0 , ConsumerId )
1710
1552
end , {State0 , Effects0 }, Returned ),
1711
- State2 =
1712
- case State1 #? STATE .consumers of
1713
- #{ConsumerId := Con }
1714
- when MachineVersion >= 3 ->
1715
- update_or_remove_sub (Meta , ConsumerId , Con , State1 );
1716
- #{ConsumerId := Con0 }
1717
- when MachineVersion =:= 2 ->
1718
- Credit = increase_credit (Meta , Con0 , map_size (Returned )),
1719
- Con = Con0 # consumer {credit = Credit },
1720
- update_or_remove_sub (Meta , ConsumerId , Con , State1 );
1721
- _ ->
1722
- State1
1723
- end ,
1553
+ State2 = case State1 #? STATE .consumers of
1554
+ #{ConsumerId := Con } ->
1555
+ update_or_remove_sub (Meta , ConsumerId , Con , State1 );
1556
+ % #{ConsumerId := Con0}
1557
+ % when MachineVersion =:= 2 ->
1558
+ % Credit = increase_credit(Meta, Con0, map_size(Returned)),
1559
+ % Con = Con0#consumer{credit = Credit},
1560
+ % update_or_remove_sub(Meta, ConsumerId, Con, State1);
1561
+ _ ->
1562
+ State1
1563
+ end ,
1724
1564
{State , ok , Effects } = checkout (Meta , State0 , State2 , Effects1 ),
1725
1565
update_smallest_raft_index (IncomingRaftIdx , State , Effects ).
1726
1566
@@ -1776,10 +1616,10 @@ increase_credit(_Meta, #consumer{cfg = #consumer_cfg{lifetime = auto,
1776
1616
credit = Credit }, _ ) ->
1777
1617
% % credit_mode: `credited' also doesn't automatically increment credit
1778
1618
Credit ;
1779
- increase_credit (#{ machine_version : = MachineVersion } ,
1619
+ increase_credit (_Meta ,
1780
1620
# consumer {cfg = # consumer_cfg {credit_mode = {simple_prefetch , MaxCredit }},
1781
1621
credit = Current }, Credit )
1782
- when MachineVersion >= 3 andalso MaxCredit > 0 ->
1622
+ when MaxCredit > 0 ->
1783
1623
min (MaxCredit , Current + Credit );
1784
1624
increase_credit (_Meta , # consumer {credit = Current }, Credit ) ->
1785
1625
Current + Credit .
@@ -1885,8 +1725,7 @@ get_header(Key, Header)
1885
1725
when is_map (Header ) andalso is_map_key (size , Header ) ->
1886
1726
maps :get (Key , Header , undefined ).
1887
1727
1888
- return_one (#{machine_version := MachineVersion } = Meta ,
1889
- MsgId , Msg0 ,
1728
+ return_one (Meta , MsgId , Msg0 ,
1890
1729
#? STATE {returns = Returns ,
1891
1730
consumers = Consumers ,
1892
1731
dlx = DlxState0 ,
@@ -1904,13 +1743,8 @@ return_one(#{machine_version := MachineVersion} = Meta,
1904
1743
{State , DlxEffects ++ Effects0 };
1905
1744
_ ->
1906
1745
Checked = maps :remove (MsgId , Checked0 ),
1907
- Con = case MachineVersion of
1908
- V when V >= 3 ->
1909
- Con0 # consumer {checked_out = Checked ,
1910
- credit = increase_credit (Meta , Con0 , 1 )};
1911
- 2 ->
1912
- Con0 # consumer {checked_out = Checked }
1913
- end ,
1746
+ Con = Con0 # consumer {checked_out = Checked ,
1747
+ credit = increase_credit (Meta , Con0 , 1 )},
1914
1748
{add_bytes_return (
1915
1749
Header ,
1916
1750
State0 #? STATE {consumers = Consumers #{ConsumerId => Con },
@@ -2317,8 +2151,7 @@ merge_consumer(Meta, #consumer{cfg = CCfg, checked_out = Checked} = Consumer,
2317
2151
status = up ,
2318
2152
credit = NewCredit }.
2319
2153
2320
- credit_mode (#{machine_version := Vsn }, Credit , simple_prefetch )
2321
- when Vsn >= 3 ->
2154
+ credit_mode (_Meta , Credit , simple_prefetch ) ->
2322
2155
{simple_prefetch , Credit };
2323
2156
credit_mode (_ , _ , Mode ) ->
2324
2157
Mode .
@@ -2542,9 +2375,9 @@ convert(To, To, State) ->
2542
2375
convert (0 , To , State ) ->
2543
2376
convert (1 , To , rabbit_fifo_v1 :convert_v0_to_v1 (State ));
2544
2377
convert (1 , To , State ) ->
2545
- convert (2 , To , convert_v1_to_v2 (State ));
2378
+ convert (2 , To , rabbit_fifo_v3 : convert_v1_to_v2 (State ));
2546
2379
convert (2 , To , State ) ->
2547
- convert (3 , To , convert_v2_to_v3 (State ));
2380
+ convert (3 , To , rabbit_fifo_v3 : convert_v2_to_v3 (State ));
2548
2381
convert (3 , To , State ) ->
2549
2382
convert (4 , To , convert_v3_to_v4 (State )).
2550
2383
0 commit comments