Skip to content

Commit 1b29467

Browse files
Merge pull request #14138 from rabbitmq/mergify/bp/v4.1.x/pr-14127
QQ/Streams: Ensure open file handles are closed when a queue is deleted. (backport #14127)
2 parents 5a41a13 + 8b40a8e commit 1b29467

File tree

4 files changed

+70
-9
lines changed

4 files changed

+70
-9
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
-export([
1515
init/1,
1616
init/2,
17+
close/1,
1718
checkout/4,
1819
cancel_checkout/3,
1920
enqueue/3,
@@ -755,6 +756,13 @@ handle_ra_event(QName, Leader, close_cached_segments,
755756
handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
756757
{eol, [{unblock, cluster_name(State)}]}.
757758

759+
-spec close(rabbit_fifo_client:state()) -> ok.
760+
close(#state{cached_segments = undefined}) ->
761+
ok;
762+
close(#state{cached_segments = {_, _, Flru}}) ->
763+
_ = ra_flru:evict_all(Flru),
764+
ok.
765+
758766
%% @doc Attempts to enqueue a message using cast semantics. This provides no
759767
%% guarantees or retries if the message fails to achieve consensus or if the
760768
%% servers sent to happens not to be available. If the message is sent to a

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,9 @@ remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
414414
case maps:take(QRef, Ctxs0) of
415415
error ->
416416
State;
417-
{_, Ctxs} ->
417+
{#ctx{module = Mod,
418+
state = S}, Ctxs} ->
419+
ok = Mod:close(S),
418420
State#?STATE{ctxs = Ctxs}
419421
end.
420422

@@ -502,11 +504,10 @@ init() ->
502504

503505
-spec close(state()) -> ok.
504506
close(#?STATE{ctxs = Contexts}) ->
505-
maps:foreach(
506-
fun (_, #ctx{module = Mod,
507-
state = S}) ->
508-
ok = Mod:close(S)
509-
end, Contexts).
507+
maps:foreach(fun (_, #ctx{module = Mod,
508+
state = S}) ->
509+
ok = Mod:close(S)
510+
end, Contexts).
510511

511512
-spec new(amqqueue:amqqueue(), state()) -> state().
512513
new(Q, State) when ?is_amqqueue(Q) ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@
143143
-define(RPC_TIMEOUT, 1000).
144144
-define(START_CLUSTER_TIMEOUT, 5000).
145145
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
146-
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
147146
-define(TICK_INTERVAL, 5000). %% the ra server tick time
148147
-define(DELETE_TIMEOUT, 5000).
149148
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
@@ -214,8 +213,8 @@ init(Q) when ?is_amqqueue(Q) ->
214213
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
215214

216215
-spec close(rabbit_fifo_client:state()) -> ok.
217-
close(_State) ->
218-
ok.
216+
close(State) ->
217+
rabbit_fifo_client:close(State).
219218

220219
-spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
221220
rabbit_fifo_client:state().

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ all_tests() ->
197197
requeue_multiple_true,
198198
requeue_multiple_false,
199199
subscribe_from_each,
200+
dont_leak_file_handles,
200201
leader_health_check
201202
].
202203

@@ -1641,6 +1642,54 @@ subscribe_from_each(Config) ->
16411642

16421643
ok.
16431644

1645+
dont_leak_file_handles(Config) ->
1646+
1647+
[Server0 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1648+
1649+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1650+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
1651+
QQ = ?config(queue_name, Config),
1652+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1653+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1654+
[begin
1655+
publish_confirm(Ch, QQ)
1656+
end || _ <- Servers],
1657+
timer:sleep(100),
1658+
%% roll the wal to force consumer messages to be read from disk
1659+
[begin
1660+
ok = rpc:call(S, ra_log_wal, force_roll_over, [ra_log_wal])
1661+
end || S <- Servers],
1662+
timer:sleep(256),
1663+
1664+
C = rabbit_ct_client_helpers:open_channel(Config, Server0),
1665+
[_, NCh1] = rpc:call(Server0, rabbit_channel, list, []),
1666+
qos(C, 1, false),
1667+
subscribe(C, QQ, false),
1668+
[begin
1669+
receive
1670+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
1671+
amqp_channel:call(C, #'basic.ack'{delivery_tag = DeliveryTag})
1672+
after 5000 ->
1673+
flush(1),
1674+
ct:fail("basic.deliver timeout")
1675+
end
1676+
end || _ <- Servers],
1677+
flush(1),
1678+
[{_, MonBy2}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
1679+
NumMonRefsBefore = length([M || M <- MonBy2, is_reference(M)]),
1680+
%% delete queue
1681+
?assertMatch(#'queue.delete_ok'{},
1682+
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
1683+
[{_, MonBy3}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]),
1684+
NumMonRefsAfter = length([M || M <- MonBy3, is_reference(M)]),
1685+
%% this isn't an ideal way to assert this but every file handle creates
1686+
%% a monitor that (currenlty?) is a reference so we assert that we have
1687+
%% fewer reference monitors after
1688+
?assert(NumMonRefsAfter < NumMonRefsBefore),
1689+
1690+
rabbit_ct_client_helpers:close_channel(C),
1691+
ok.
1692+
16441693
gh_12635(Config) ->
16451694
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
16461695
[Server0, _Server1, Server2] =
@@ -4949,3 +4998,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
49494998
ensure_qq_proc_dead(Config, Server, RaName)
49504999
end.
49515000

5001+
lsof_rpc() ->
5002+
Cmd = rabbit_misc:format(
5003+
"lsof -p ~ts", [os:getpid()]),
5004+
os:cmd(Cmd).

0 commit comments

Comments
 (0)