@@ -1400,10 +1400,10 @@ cancel_consumer0(Meta, ConsumerKey,
1400
1400
activate_next_consumer ({State , Effects }) ->
1401
1401
activate_next_consumer (State , Effects ).
1402
1402
1403
- activate_next_consumer (#? STATE {cfg = # cfg {consumer_strategy = competing }} = State0 ,
1404
- Effects0 ) ->
1405
- {State0 , Effects0 };
1406
- activate_next_consumer (#? STATE {consumers = Cons ,
1403
+ activate_next_consumer (#? STATE {cfg = # cfg {consumer_strategy = competing }} = State ,
1404
+ Effects ) ->
1405
+ {State , Effects };
1406
+ activate_next_consumer (#? STATE {consumers = Cons0 ,
1407
1407
waiting_consumers = Waiting0 } = State0 ,
1408
1408
Effects0 ) ->
1409
1409
% % invariant, the waiting list always need to be sorted by consumers that are
@@ -1416,11 +1416,11 @@ activate_next_consumer(#?STATE{consumers = Cons,
1416
1416
undefined
1417
1417
end ,
1418
1418
1419
- case {active_consumer (Cons ), NextConsumer } of
1419
+ case {active_consumer (Cons0 ), NextConsumer } of
1420
1420
{undefined , {NextCKey , # consumer {cfg = NextCCfg } = NextC }} ->
1421
1421
Remaining = tl (Waiting0 ),
1422
1422
% % TODO: can this happen?
1423
- Consumer = case maps :get (NextCKey , Cons , undefined ) of
1423
+ Consumer = case maps :get (NextCKey , Cons0 , undefined ) of
1424
1424
undefined ->
1425
1425
NextC ;
1426
1426
Existing ->
@@ -1433,7 +1433,7 @@ activate_next_consumer(#?STATE{consumers = Cons,
1433
1433
ServiceQueue1 = maybe_queue_consumer (NextCKey ,
1434
1434
Consumer ,
1435
1435
ServiceQueue ),
1436
- State = State0 #? STATE {consumers = Cons #{NextCKey => Consumer },
1436
+ State = State0 #? STATE {consumers = Cons0 #{NextCKey => Consumer },
1437
1437
service_queue = ServiceQueue1 ,
1438
1438
waiting_consumers = Remaining },
1439
1439
Effects = consumer_update_active_effects (State , Consumer ,
@@ -1452,11 +1452,12 @@ activate_next_consumer(#?STATE{consumers = Cons,
1452
1452
ServiceQueue1 = maybe_queue_consumer (NextCKey ,
1453
1453
Consumer ,
1454
1454
ServiceQueue ),
1455
- State = State0 #? STATE {consumers = maps :remove (ActiveCKey ,
1456
- Cons #{NextCKey => Consumer }),
1455
+ Cons1 = Cons0 #{NextCKey => Consumer },
1456
+ Cons = maps :remove (ActiveCKey , Cons1 ),
1457
+ Waiting = add_waiting ({ActiveCKey , Active }, Remaining ),
1458
+ State = State0 #? STATE {consumers = Cons ,
1457
1459
service_queue = ServiceQueue1 ,
1458
- waiting_consumers =
1459
- add_waiting ({ActiveCKey , Active }, Remaining )},
1460
+ waiting_consumers = Waiting },
1460
1461
Effects = consumer_update_active_effects (State , Consumer ,
1461
1462
true , single_active ,
1462
1463
Effects0 ),
@@ -1466,9 +1467,10 @@ activate_next_consumer(#?STATE{consumers = Cons,
1466
1467
when WaitingPriority > ActivePriority ->
1467
1468
% % A higher priority consumer has attached but the current one has
1468
1469
% % pending messages
1469
- {State0 #? STATE {consumers =
1470
- Cons #{ActiveCKey => Active # consumer {status = fading }}},
1471
- Effects0 };
1470
+ Cons = maps :update (ActiveCKey ,
1471
+ Active # consumer {status = fading },
1472
+ Cons0 ),
1473
+ {State0 #? STATE {consumers = Cons }, Effects0 };
1472
1474
_ ->
1473
1475
% % no activation
1474
1476
{State0 , Effects0 }
@@ -1504,10 +1506,10 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
1504
1506
status = cancelled },
1505
1507
S0 ), Effects0 };
1506
1508
_ ->
1507
- {S1 , Effects1 } = return_all (Meta , S0 , Effects0 , ConsumerKey , Consumer ),
1509
+ {S1 , Effects } = return_all (Meta , S0 , Effects0 , ConsumerKey , Consumer ),
1508
1510
{S1 #? STATE {consumers = maps :remove (ConsumerKey , S1 #? STATE .consumers ),
1509
1511
last_active = Ts },
1510
- Effects1 }
1512
+ Effects }
1511
1513
end .
1512
1514
1513
1515
apply_enqueue (#{index := RaftIdx ,
@@ -1743,8 +1745,7 @@ update_header(Key, UpdateFun, Default, ?TUPLE(Size, Expiry))
1743
1745
update_header (Key , UpdateFun , Default , #{size => Size ,
1744
1746
expiry => Expiry });
1745
1747
update_header (Key , UpdateFun , Default , Header )
1746
- when is_map (Header ) andalso
1747
- is_map_key (size , Header ) ->
1748
+ when is_map_key (size , Header ) ->
1748
1749
maps :update_with (Key , UpdateFun , Default , Header ).
1749
1750
1750
1751
get_msg_header (? MSG (_Idx , Header )) ->
@@ -2172,24 +2173,24 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
2172
2173
{Life , Mode } = Spec , Priority ,
2173
2174
#? STATE {cfg = # cfg {consumer_strategy = single_active },
2174
2175
consumers = Cons0 ,
2175
- waiting_consumers = Waiting ,
2176
- service_queue = _ServiceQueue0 } = State0 ) ->
2176
+ waiting_consumers = Waiting0 ,
2177
+ service_queue = _ServiceQueue0 } = State ) ->
2177
2178
% % if it is the current active consumer, just update
2178
2179
% % if it is a cancelled active consumer, add to waiting unless it is the only
2179
2180
% % one, then merge
2180
2181
case active_consumer (Cons0 ) of
2181
2182
{ConsumerKey , # consumer {status = up } = Consumer0 } ->
2182
2183
Consumer = merge_consumer (Meta , Consumer0 , ConsumerMeta ,
2183
2184
Spec , Priority ),
2184
- {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State0 )};
2185
+ {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State )};
2185
2186
undefined when is_map_key (ConsumerKey , Cons0 ) ->
2186
2187
% % there is no active consumer and the current consumer is in the
2187
2188
% % consumers map and thus must be cancelled, in this case we can just
2188
2189
% % merge and effectively make this the current active one
2189
2190
Consumer0 = maps :get (ConsumerKey , Cons0 ),
2190
2191
Consumer = merge_consumer (Meta , Consumer0 , ConsumerMeta ,
2191
2192
Spec , Priority ),
2192
- {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State0 )};
2193
+ {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State )};
2193
2194
_ ->
2194
2195
% % add as a new waiting consumer
2195
2196
Credit = included_credit (Mode ),
@@ -2202,9 +2203,8 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
2202
2203
credit_mode = Mode },
2203
2204
credit = Credit ,
2204
2205
delivery_count = DeliveryCount },
2205
- {Consumer ,
2206
- State0 #? STATE {waiting_consumers =
2207
- add_waiting ({ConsumerKey , Consumer }, Waiting )}}
2206
+ Waiting = add_waiting ({ConsumerKey , Consumer }, Waiting0 ),
2207
+ {Consumer , State #? STATE {waiting_consumers = Waiting }}
2208
2208
end .
2209
2209
2210
2210
add_waiting ({Key , _ } = New , Waiting ) ->
@@ -2496,10 +2496,8 @@ message_size(Msg) ->
2496
2496
false ->
2497
2497
% % probably only hit this for testing so ok to use erts_debug
2498
2498
{0 , erts_debug :size (Msg )}
2499
-
2500
2499
end .
2501
2500
2502
-
2503
2501
all_nodes (#? STATE {consumers = Cons0 ,
2504
2502
enqueuers = Enqs0 ,
2505
2503
waiting_consumers = WaitingConsumers0 }) ->
@@ -2577,9 +2575,10 @@ get_priority(#{priority := Priority}) ->
2577
2575
get_priority (#{args := Args }) ->
2578
2576
% % fallback, v3 option
2579
2577
case rabbit_misc :table_lookup (Args , <<" x-priority" >>) of
2580
- {_Key , Value } ->
2578
+ {_Type , Value } ->
2581
2579
Value ;
2582
- _ -> 0
2580
+ _ ->
2581
+ 0
2583
2582
end ;
2584
2583
get_priority (_ ) ->
2585
2584
0 .
@@ -2713,7 +2712,6 @@ consumer_key_from_id(ConsumerId, {_, _, I}) ->
2713
2712
consumer_key_from_id (_ConsumerId , none ) ->
2714
2713
error .
2715
2714
2716
-
2717
2715
consumer_cancel_info (ConsumerKey , #? STATE {consumers = Consumers }) ->
2718
2716
case Consumers of
2719
2717
#{ConsumerKey := # consumer {checked_out = Checked }} ->
0 commit comments