Skip to content

Commit bb67af8

Browse files
committed
More test refactoring and new API fixes
rabbit_fifo_prop_SUITE refactoring and other fixes. fixss bzl bzl fixes
1 parent 95cfecc commit bb67af8

13 files changed

+818
-1139
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,7 @@ rabbitmq_suite(
733733
deps = [
734734
"//deps/rabbit_common:erlang_app",
735735
"@proper//:erlang_app",
736+
"@meck//:erlang_app",
736737
"@ra//:erlang_app",
737738
],
738739
)

deps/rabbit/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
13711371
hdrs = ["src/rabbit_fifo.hrl"],
13721372
app_name = "rabbit",
13731373
erlc_opts = "//:test_erlc_opts",
1374-
deps = ["//deps/rabbit_common:erlang_app"],
1374+
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
13751375
)
13761376
erlang_bytecode(
13771377
name = "rabbit_fifo_dlx_SUITE_beam_files",

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,5 +175,6 @@
175175
{quorum_queues_v4,
176176
#{desc => "Unlocks QQ v4 goodies",
177177
stability => stable,
178-
depends_on => [quorum_queue]
178+
depends_on => [quorum_queue,
179+
credit_api_v2]
179180
}}).

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 90 additions & 69 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@
7575
{simple_prefetch, MaxCredit :: non_neg_integer()}.
7676
%% determines how credit is replenished
7777

78-
-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
79-
credited,
80-
simple_prefetch} |
78+
-type checkout_spec() :: {once | auto,
79+
Num :: non_neg_integer(),
80+
credited | simple_prefetch} |
8181

8282
{dequeue, settled | unsettled} |
8383
cancel |

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
-define(SOFT_LIMIT, 32).
3939
-define(TIMER_TIME, 10000).
4040
-define(COMMAND_TIMEOUT, 30000).
41+
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
4142

4243
-type seq() :: non_neg_integer().
4344

@@ -118,6 +119,9 @@ enqueue(QName, Correlation, Msg,
118119
cfg = #cfg{servers = Servers,
119120
timeout = Timeout}} = State0) ->
120121
%% the first publish, register and enqueuer for this process.
122+
%% TODO: we _only_ need to pre-register an enqueuer to discover if the
123+
%% queue overflow is `reject_publish` and the queue can accept new messages
124+
%% if the queue does not have `reject_publish` set we can skip this step
121125
Reg = rabbit_fifo:make_register_enqueuer(self()),
122126
case ra:process_command(Servers, Reg, Timeout) of
123127
{ok, reject_publish, Leader} ->
@@ -335,19 +339,32 @@ discard(ConsumerTag, [_|_] = MsgIds,
335339
state()) ->
336340
{ok, ConsumerInfos :: map(), state()} |
337341
{error | timeout, term()}.
338-
checkout(ConsumerTag, CreditMode, Meta,
342+
checkout(ConsumerTag, CreditMode, #{} = Meta,
339343
#state{consumers = CDels0} = State0)
340-
when is_binary(ConsumerTag) ->
344+
when is_binary(ConsumerTag) andalso
345+
is_tuple(CreditMode) ->
341346
Servers = sorted_servers(State0),
342347
ConsumerId = consumer_id(ConsumerTag),
343-
NumUnsettled = case CreditMode of
344-
credited -> 0;
348+
Spec = case rabbit_fifo:is_v4() of
349+
true ->
350+
case CreditMode of
351+
{simple_prefetch, 0} ->
352+
{auto, {simple_prefetch,
353+
?UNLIMITED_PREFETCH_COUNT}};
354+
_ ->
355+
{auto, CreditMode}
356+
end;
357+
false ->
358+
case CreditMode of
359+
{credited, _} ->
360+
{auto, 0, credited};
361+
{simple_prefetch, 0} ->
362+
{auto, ?UNLIMITED_PREFETCH_COUNT, simple_prefetch};
345363
{simple_prefetch, Num} ->
346-
Num
347-
end,
348-
Cmd = rabbit_fifo:make_checkout(ConsumerId,
349-
{auto, NumUnsettled, CreditMode},
350-
Meta),
364+
{auto, Num, simple_prefetch}
365+
end
366+
end,
367+
Cmd = rabbit_fifo:make_checkout(ConsumerId, Spec, Meta),
351368
%% ???
352369
Ack = maps:get(ack, Meta, true),
353370

@@ -369,7 +386,7 @@ checkout(ConsumerTag, CreditMode, Meta,
369386
NextMsgId - 1
370387
end
371388
end,
372-
DeliveryCount = case maps:is_key(initial_delivery_count, Meta) of
389+
DeliveryCount = case rabbit_fifo:is_v4() of
373390
true -> credit_api_v2;
374391
false -> {credit_api_v1, 0}
375392
end,

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@
113113
-opaque state() :: #?STATE{}.
114114

115115
%% Delete atom 'credit_api_v1' when feature flag credit_api_v2 becomes required.
116-
-type consume_mode() :: {simple_prefetch, non_neg_integer()} |
116+
-type consume_mode() :: {simple_prefetch, Prefetch :: non_neg_integer()} |
117117
{credited, Initial :: delivery_count() | credit_api_v1}.
118118
-type consume_spec() :: #{no_ack := boolean(),
119119
channel_pid := pid(),

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -849,29 +849,19 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
849849
ConsumerTag = quorum_ctag(ConsumerTag0),
850850
%% consumer info is used to describe the consumer properties
851851
AckRequired = not NoAck,
852-
{CreditMode, DeclaredPrefetch, ConsumerMeta0} =
853-
case Mode of
854-
{credited, C} ->
855-
Meta = if C =:= credit_api_v1 ->
856-
#{};
857-
is_integer(C) ->
858-
#{initial_delivery_count => C}
859-
end,
860-
{credited, 0, Meta};
861-
{simple_prefetch = M, Declared} ->
862-
Effective = case Declared of
863-
0 -> ?UNLIMITED_PREFETCH_COUNT;
864-
_ -> Declared
865-
end,
866-
{{M, Effective}, Declared, #{}}
867-
end,
868-
ConsumerMeta = maps:merge(ConsumerMeta0,
869-
#{ack => AckRequired,
870-
prefetch => DeclaredPrefetch,
871-
args => Args,
872-
username => ActingUser}),
852+
Prefetch = case Mode of
853+
{simple_prefetch, Declared} ->
854+
Declared;
855+
_ ->
856+
0
857+
end,
858+
859+
ConsumerMeta = #{ack => AckRequired,
860+
prefetch => Prefetch,
861+
args => Args,
862+
username => ActingUser},
873863
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
874-
CreditMode, ConsumerMeta,
864+
Mode, ConsumerMeta,
875865
QState0),
876866
case single_active_consumer_on(Q) of
877867
true ->
@@ -887,10 +877,10 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
887877
rabbit_core_metrics:consumer_created(
888878
ChPid, ConsumerTag, ExclusiveConsume,
889879
AckRequired, QName,
890-
DeclaredPrefetch, ActivityStatus == single_active, %% Active
880+
Prefetch, ActivityStatus == single_active, %% Active
891881
ActivityStatus, Args),
892882
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
893-
AckRequired, QName, DeclaredPrefetch,
883+
AckRequired, QName, Prefetch,
894884
Args, none, ActingUser),
895885
{ok, QState};
896886
{error, Error} ->
@@ -902,10 +892,10 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
902892
rabbit_core_metrics:consumer_created(
903893
ChPid, ConsumerTag, ExclusiveConsume,
904894
AckRequired, QName,
905-
DeclaredPrefetch, true, %% Active
895+
Prefetch, true, %% Active
906896
up, Args),
907897
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
908-
AckRequired, QName, DeclaredPrefetch,
898+
AckRequired, QName, Prefetch,
909899
Args, none, ActingUser),
910900
{ok, QState}
911901
end.

deps/rabbit/test/amqp_credit_api_v2_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ credit_api_v2(Config) ->
126126

127127
?assertEqual(ok,
128128
rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)),
129+
?assertEqual(ok,
130+
rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queues_v4)),
129131
flush(enabled_feature_flag),
130132

131133
%% Consume with credit API v2

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3306,12 +3306,14 @@ cancel_consumer_gh_3729(Config) ->
33063306
ct:fail("basic.cancel_ok timeout")
33073307
end,
33083308

3309-
D = #'queue.declare'{queue = QQ, passive = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
3309+
D = #'queue.declare'{queue = QQ, passive = true,
3310+
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
33103311

33113312
F = fun() ->
33123313
#'queue.declare_ok'{queue = QQ,
33133314
message_count = MC,
33143315
consumer_count = CC} = amqp_channel:call(Ch, D),
3316+
ct:pal("Mc ~b CC ~b", [MC, CC]),
33153317
MC =:= 1 andalso CC =:= 0
33163318
end,
33173319
rabbit_ct_helpers:await_condition(F, 30000),

0 commit comments

Comments
 (0)