Skip to content

Commit e11e6e2

Browse files
committed
fixes
1 parent 35cf71b commit e11e6e2

File tree

2 files changed

+23
-20
lines changed

2 files changed

+23
-20
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,9 @@ query_messages_total(State) ->
11721172
messages_total(State).
11731173

11741174
query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
1175-
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
1175+
Cons = maps:fold(fun(_, ?CONSUMER_PID(P) = V, S) ->
1176+
S#{P => V}
1177+
end, #{}, Cons0),
11761178
maps:keys(maps:merge(Enqs, Cons)).
11771179

11781180

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ basics(Config) ->
9393
ConsumerTag = UId,
9494
ok = start_cluster(ClusterName, [ServerId]),
9595
FState0 = rabbit_fifo_client:init([ServerId]),
96-
{ok, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1},
97-
#{}, FState0),
96+
{ok, _, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1},
97+
#{}, FState0),
9898

9999
rabbit_quorum_queue:wal_force_roll_over(node()),
100100
% create segment the segment will trigger a snapshot
@@ -184,7 +184,7 @@ duplicate_delivery(Config) ->
184184
ServerId = ?config(node_id, Config),
185185
ok = start_cluster(ClusterName, [ServerId]),
186186
F0 = rabbit_fifo_client:init([ServerId]),
187-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
187+
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
188188
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1),
189189
Fun = fun Loop(S0) ->
190190
receive
@@ -219,7 +219,7 @@ usage(Config) ->
219219
ServerId = ?config(node_id, Config),
220220
ok = start_cluster(ClusterName, [ServerId]),
221221
F0 = rabbit_fifo_client:init([ServerId]),
222-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
222+
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
223223
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1),
224224
{ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, corr2, msg2, F2),
225225
{_, _, _} = process_ra_events(receive_ra_events(2, 2), ClusterName, F3),
@@ -272,7 +272,7 @@ detects_lost_delivery(Config) ->
272272
F000 = rabbit_fifo_client:init([ServerId]),
273273
{ok, F00, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F000),
274274
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), ClusterName, F00),
275-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
275+
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
276276
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F1),
277277
{ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, msg3, F2),
278278
% lose first delivery
@@ -301,9 +301,9 @@ returns_after_down(Config) ->
301301
Self = self(),
302302
_Pid = spawn(fun () ->
303303
F = rabbit_fifo_client:init([ServerId]),
304-
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>,
305-
{simple_prefetch, 10},
306-
#{}, F),
304+
{ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>,
305+
{simple_prefetch, 10},
306+
#{}, F),
307307
Self ! checkout_done
308308
end),
309309
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
@@ -382,7 +382,7 @@ discard(Config) ->
382382
_ = ra:members(ServerId),
383383

384384
F0 = rabbit_fifo_client:init([ServerId]),
385-
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
385+
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
386386
#{}, F0),
387387
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1),
388388
F3 = discard_next_delivery(ClusterName, F2, 5000),
@@ -405,8 +405,8 @@ cancel_checkout(Config) ->
405405
ok = start_cluster(ClusterName, [ServerId]),
406406
F0 = rabbit_fifo_client:init([ServerId], 4),
407407
{ok, F1, []} = rabbit_fifo_client:enqueue(ClusterName, m1, F0),
408-
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
409-
#{}, F1),
408+
{ok, _, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
409+
#{}, F1),
410410
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), ClusterName, F2,
411411
[], [], fun (_, S) -> S end),
412412
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
@@ -424,7 +424,7 @@ lost_delivery(Config) ->
424424
{_, _, F2} = process_ra_events(
425425
receive_ra_events(1, 0), ClusterName, F1, [], [],
426426
fun (_, S) -> S end),
427-
{ok, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2),
427+
{ok, _, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2),
428428
%% drop a delivery, simulating e.g. a full distribution buffer
429429
receive
430430
{ra_event, _, Evt} ->
@@ -449,6 +449,7 @@ lost_delivery(Config) ->
449449
ok.
450450

451451
credit_api_v1(Config) ->
452+
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end),
452453
ClusterName = ?config(cluster_name, Config),
453454
ServerId = ?config(node_id, Config),
454455
ok = start_cluster(ClusterName, [ServerId]),
@@ -458,7 +459,7 @@ credit_api_v1(Config) ->
458459
{_, _, F3} = process_ra_events(receive_ra_events(2, 0), ClusterName, F2),
459460
%% checkout with 0 prefetch
460461
CTag = <<"my-tag">>,
461-
{ok, F4} = rabbit_fifo_client:checkout(CTag, credited, #{}, F3),
462+
{ok, _, F4} = rabbit_fifo_client:checkout(CTag, {credited, 0}, #{}, F3),
462463
%% assert no deliveries
463464
{_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [],
464465
fun
@@ -505,9 +506,9 @@ credit_api_v2(Config) ->
505506
CTag = <<"my-tag">>,
506507
DC0 = 16#ff_ff_ff_ff,
507508
DC1 = 0, %% = DC0 + 1 using 32 bit serial number arithmetic
508-
{ok, F4} = rabbit_fifo_client:checkout(
509+
{ok, _, F4} = rabbit_fifo_client:checkout(
509510
%% initial_delivery_count in consumer meta means credit API v2.
510-
CTag, credited, #{initial_delivery_count => DC0}, F3),
511+
CTag, {credited, DC0}, #{}, F3),
511512
%% assert no deliveries
512513
{_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [],
513514
fun
@@ -609,7 +610,7 @@ test_queries(Config) ->
609610
exit(ready_timeout)
610611
end,
611612
F0 = rabbit_fifo_client:init([ServerId], 4),
612-
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0),
613+
{ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0),
613614
{ok, {_, Ready}, _} = ra:local_query(ServerId,
614615
fun rabbit_fifo:query_messages_ready/1),
615616
?assertEqual(1, Ready),
@@ -637,8 +638,8 @@ dequeue(Config) ->
637638
{ok, F2_, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1b),
638639
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F2_),
639640

640-
% {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
641-
{ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
641+
{ok, _, {_, _, 0, _, msg1}, F3} =
642+
rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
642643
{ok, F4_, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F3),
643644
{_, _, F4} = process_ra_events(receive_ra_events(1, 0), ClusterName, F4_),
644645
{ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(ClusterName, Tag, unsettled, F4),
@@ -698,7 +699,7 @@ receive_ra_events(Acc) ->
698699
end.
699700

700701
process_ra_events(Events, ClusterName, State) ->
701-
DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) ->
702+
DeliveryFun = fun ({deliver, Tag, _, Msgs}, S) ->
702703
MsgIds = [element(1, M) || M <- Msgs],
703704
{S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S),
704705
S0

0 commit comments

Comments
 (0)