Skip to content

Commit 0e6ffa6

Browse files
ansdmergify[bot]
authored andcommitted
Fix AMQP connection throttling
Prior to this change the AMQP connection was never blocked, even in the event of memory or disk alarms. (cherry picked from commit 5a870df) # Conflicts: # deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
1 parent fef35ae commit 0e6ffa6

File tree

2 files changed

+60
-2
lines changed

2 files changed

+60
-2
lines changed

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ handle_1_0_session_frame(Channel, Frame, State) ->
458458
case Frame of
459459
#'v1_0.end'{} ->
460460
untrack_channel(Channel, State);
461-
#'v1_0.transfer'{} ->
461+
{#'v1_0.transfer'{}, _MsgPart} ->
462462
case (State#v1.connection_state =:= blocking) of
463463
true ->
464464
ok = rabbit_heartbeat:pause_monitor(

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@ groups() ->
2929
roundtrip_classic_queue_with_drain,
3030
roundtrip_quorum_queue_with_drain,
3131
roundtrip_stream_queue_with_drain,
32+
<<<<<<< HEAD
3233
message_headers_conversion
34+
=======
35+
amqp_stream_amqpl,
36+
message_headers_conversion,
37+
resource_alarm
38+
>>>>>>> 5a870df54c (Fix AMQP connection throttling)
3339
]},
3440
{metrics, [], [
3541
auth_attempt_metrics
@@ -332,6 +338,58 @@ message_headers_conversion(Config) ->
332338
ok = amqp10_client:close_connection(Connection),
333339
ok.
334340

341+
resource_alarm(Config) ->
342+
Host = ?config(rmq_hostname, Config),
343+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
344+
QName = atom_to_binary(?FUNCTION_NAME, utf8),
345+
Address = <<"/amq/queue/", QName/binary>>,
346+
Ch = rabbit_ct_client_helpers:open_channel(Config),
347+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
348+
349+
OpnConf = #{address => Host,
350+
port => Port,
351+
container_id => atom_to_binary(?FUNCTION_NAME),
352+
sasl => {plain, <<"guest">>, <<"guest">>}},
353+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
354+
{ok, Session} = amqp10_client:begin_session(Connection),
355+
{ok, Sender} = create_amqp10_sender(Session, Address),
356+
357+
M1 = amqp10_msg:new(<<"t1">>, <<"m1">>, false),
358+
M2 = amqp10_msg:new(<<"t2">>, <<"m2">>, false),
359+
M3 = amqp10_msg:new(<<"t3">>, <<"m3">>, false),
360+
M4 = amqp10_msg:new(<<"t4">>, <<"m4">>, false),
361+
362+
ok = amqp10_client:send_msg(Sender, M1),
363+
ok = wait_for_settlement(<<"t1">>),
364+
365+
ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
366+
%% Let connection block.
367+
timer:sleep(100),
368+
369+
ok = amqp10_client:send_msg(Sender, M2),
370+
ok = amqp10_client:send_msg(Sender, M3),
371+
ok = amqp10_client:send_msg(Sender, M4),
372+
373+
%% M2 still goes through, but M3 should get blocked. (Server is off by one message.)
374+
receive {amqp10_disposition, {accepted, <<"t2">>}} -> ok
375+
after 300 -> ct:fail({accepted_timeout, ?LINE})
376+
end,
377+
receive {amqp10_disposition, {accepted, <<"t3">>}} -> ct:fail("expected connection to be blocked")
378+
after 300 -> ok
379+
end,
380+
381+
%% Unblock connection.
382+
ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
383+
384+
%% Previously sent M3 and M4 should now be processed on the server.
385+
receive {amqp10_disposition, {accepted, <<"t3">>}} -> ok
386+
after 5000 -> ct:fail({accepted_timeout, ?LINE})
387+
end,
388+
ok = wait_for_settlement(<<"t4">>),
389+
390+
delete_queue(Config, QName),
391+
ok = amqp10_client:close_connection(Connection).
392+
335393
amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
336394
{ok, Sender} = create_amqp10_sender(Session, Address),
337395

@@ -454,7 +512,7 @@ wait_for_settlement(Tag, State) ->
454512
ok
455513
after 5000 ->
456514
flush("wait_for_settlement timed out"),
457-
ct:fail(settled_timeout)
515+
ct:fail({settled_timeout, Tag})
458516
end.
459517

460518
wait_for_accepts(0) -> ok;

0 commit comments

Comments
 (0)