Skip to content

Commit c688169

Browse files
committed
QQ/Streams: Ensure open file handles are closed when a queue is deleted.
If a stream or quorum queue has opened a file to read a consumer message and the queue is deleted the file handle reference is lost and kept open until the end of the channel lifetime.
1 parent 8806e56 commit c688169

File tree

4 files changed

+70
-9
lines changed

4 files changed

+70
-9
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
-export([
1515
init/1,
1616
init/2,
17+
close/1,
1718
checkout/4,
1819
cancel_checkout/3,
1920
enqueue/3,
@@ -755,6 +756,13 @@ handle_ra_event(QName, Leader, close_cached_segments,
755756
handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
756757
{eol, [{unblock, cluster_name(State)}]}.
757758

759+
-spec close(rabbit_fifo_client:state()) -> ok.
760+
close(#state{cached_segments = undefined}) ->
761+
ok;
762+
close(#state{cached_segments = {_, _, Flru}}) ->
763+
_ = ra_flru:evict_all(Flru),
764+
ok.
765+
758766
%% @doc Attempts to enqueue a message using cast semantics. This provides no
759767
%% guarantees or retries if the message fails to achieve consensus or if the
760768
%% servers sent to happens not to be available. If the message is sent to a

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,9 @@ remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
407407
case maps:take(QRef, Ctxs0) of
408408
error ->
409409
State;
410-
{_, Ctxs} ->
410+
{#ctx{module = Mod,
411+
state = S}, Ctxs} ->
412+
ok = Mod:close(S),
411413
State#?STATE{ctxs = Ctxs}
412414
end.
413415

@@ -495,11 +497,10 @@ init() ->
495497

496498
-spec close(state()) -> ok.
497499
close(#?STATE{ctxs = Contexts}) ->
498-
maps:foreach(
499-
fun (_, #ctx{module = Mod,
500-
state = S}) ->
501-
ok = Mod:close(S)
502-
end, Contexts).
500+
maps:foreach(fun (_, #ctx{module = Mod,
501+
state = S}) ->
502+
ok = Mod:close(S)
503+
end, Contexts).
503504

504505
-spec new(amqqueue:amqqueue(), state()) -> state().
505506
new(Q, State) when ?is_amqqueue(Q) ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@
159159
-define(RPC_TIMEOUT, 1000).
160160
-define(START_CLUSTER_TIMEOUT, 5000).
161161
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
162-
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
163162
-define(TICK_INTERVAL, 5000). %% the ra server tick time
164163
-define(DELETE_TIMEOUT, 5000).
165164
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
@@ -230,8 +229,8 @@ init(Q) when ?is_amqqueue(Q) ->
230229
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
231230

232231
-spec close(rabbit_fifo_client:state()) -> ok.
233-
close(_State) ->
234-
ok.
232+
close(State) ->
233+
rabbit_fifo_client:close(State).
235234

236235
-spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
237236
rabbit_fifo_client:state().

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ all_tests() ->
197197
requeue_multiple_true,
198198
requeue_multiple_false,
199199
subscribe_from_each,
200+
dont_leak_file_handles,
200201
leader_health_check
201202
].
202203

@@ -1629,6 +1630,54 @@ subscribe_from_each(Config) ->
16291630

16301631
ok.
16311632

1633+
dont_leak_file_handles(Config) ->
1634+
1635+
[Server0 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1636+
1637+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1638+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
1639+
QQ = ?config(queue_name, Config),
1640+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1641+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1642+
[begin
1643+
publish_confirm(Ch, QQ)
1644+
end || _ <- Servers],
1645+
timer:sleep(100),
1646+
%% roll the wal to force consumer messages to be read from disk
1647+
[begin
1648+
ok = rpc:call(S, ra_log_wal, force_roll_over, [ra_log_wal])
1649+
end || S <- Servers],
1650+
timer:sleep(256),
1651+
1652+
C = rabbit_ct_client_helpers:open_channel(Config, Server0),
1653+
[_, NCh1] = rpc:call(Server0, rabbit_channel, list, []),
1654+
qos(C, 1, false),
1655+
subscribe(C, QQ, false),
1656+
[begin
1657+
receive
1658+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
1659+
amqp_channel:call(C, #'basic.ack'{delivery_tag = DeliveryTag})
1660+
after 5000 ->
1661+
flush(1),
1662+
ct:fail("basic.deliver timeout")
1663+
end
1664+
end || _ <- Servers],
1665+
flush(1),
1666+
[{_, MonBy2}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
1667+
NumMonRefsBefore = length([M || M <- MonBy2, is_reference(M)]),
1668+
%% delete queue
1669+
?assertMatch(#'queue.delete_ok'{},
1670+
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
1671+
[{_, MonBy3}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
1672+
NumMonRefsAfter = length([M || M <- MonBy3, is_reference(M)]),
1673+
%% this isn't an ideal way to assert this but every file handle creates
1674+
%% a monitor that (currenlty?) is a reference so we assert that we have
1675+
%% fewer reference monitors after
1676+
?assert(NumMonRefsAfter < NumMonRefsBefore),
1677+
1678+
rabbit_ct_client_helpers:close_channel(C),
1679+
ok.
1680+
16321681
gh_12635(Config) ->
16331682
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
16341683
[Server0, _Server1, Server2] =
@@ -4946,3 +4995,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
49464995
ensure_qq_proc_dead(Config, Server, RaName)
49474996
end.
49484997

4998+
lsof_rpc() ->
4999+
Cmd = rabbit_misc:format(
5000+
"lsof -p ~ts", [os:getpid()]),
5001+
os:cmd(Cmd).

0 commit comments

Comments
 (0)