Skip to content

Commit 5d21173

Browse files
dcorbachomichaelklishin
authored andcommitted
Retrieve oldest message received timestamp
1 parent b84a234 commit 5d21173

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
413413
end, nothing, BQSs);
414414
info(head_message_timestamp, #state{bq = BQ, bqss = BQSs}) ->
415415
find_head_message_timestamp(BQ, BQSs, '');
416+
info(oldest_message_received_timestamp, #state{bq = BQ, bqss = BQSs}) ->
417+
find_oldest_message_received_timestamp(BQ, BQSs);
416418
info(online, _) ->
417419
'';
418420
info(Item, #state{bq = BQ, bqss = BQSs}) ->
@@ -689,6 +691,28 @@ find_head_message_timestamp(BQ, [{_, BQSN} | Rest], Timestamp) ->
689691
find_head_message_timestamp(_, [], Timestamp) ->
690692
Timestamp.
691693

694+
find_oldest_message_received_timestamp(BQ, BQs) ->
695+
%% Oldest message timestamp among all priority queues
696+
Timestamps =
697+
lists:foldl(
698+
fun({_, BQSN}, Acc) ->
699+
case oldest_message_received_timestamp(BQ, BQSN) of
700+
'' -> Acc;
701+
Ts -> [Ts | Acc]
702+
end
703+
end, [], BQs),
704+
case Timestamps of
705+
[] -> '';
706+
_ -> lists:min(Timestamps)
707+
end.
708+
709+
oldest_message_received_timestamp(BQ, BQSN) ->
710+
MsgCount = BQ:len(BQSN) + BQ:info(messages_unacknowledged_ram, BQSN),
711+
if
712+
MsgCount =/= 0 -> BQ:info(oldest_message_received_timestamp, BQSN);
713+
true -> ''
714+
end.
715+
692716
zip_msgs_and_acks(Pubs, AckTags) ->
693717
lists:zipwith(
694718
fun ({Msg, _Props}, AckTag) ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,10 @@ info(head_message_timestamp, #vqstate{
833833
q3 = Q3,
834834
ram_pending_ack = RPA}) ->
835835
head_message_timestamp(Q3, RPA);
836+
info(oldest_message_received_timestamp, #vqstate{
837+
q3 = Q3,
838+
ram_pending_ack = RPA}) ->
839+
oldest_message_received_timestamp(Q3, RPA);
836840
info(disk_reads, #vqstate{disk_read_count = Count}) ->
837841
Count;
838842
info(disk_writes, #vqstate{disk_write_count = Count}) ->
@@ -1194,7 +1198,6 @@ convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index0, V2Store0,
11941198
%% regarded as unprocessed until acked, this also prevents the result
11951199
%% apparently oscillating during repeated rejects.
11961200
%%
1197-
%% @todo OK I think we can do this differently
11981201
head_message_timestamp(Q3, RPA) ->
11991202
HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
12001203
HeadMsgStatus <-
@@ -1215,6 +1218,26 @@ head_message_timestamp(Q3, RPA) ->
12151218
false -> lists:min(Timestamps)
12161219
end.
12171220

1221+
oldest_message_received_timestamp(Q3, RPA) ->
1222+
HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
1223+
HeadMsgStatus <-
1224+
[ get_q_head(Q3),
1225+
get_pa_head(RPA) ],
1226+
HeadMsgStatus /= undefined,
1227+
HeadMsgStatus#msg_status.msg /= undefined ],
1228+
1229+
Timestamps =
1230+
[Timestamp
1231+
|| HeadMsg <- HeadMsgs,
1232+
Timestamp <- [mc:get_annotation(rts, HeadMsg)],
1233+
Timestamp /= undefined
1234+
],
1235+
1236+
case Timestamps == [] of
1237+
true -> '';
1238+
false -> lists:min(Timestamps)
1239+
end.
1240+
12181241
get_q_head(Q) ->
12191242
?QUEUE:get(Q, undefined).
12201243

deps/rabbit/test/priority_queue_SUITE.erl

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ groups() ->
2929
dropwhile_fetchwhile,
3030
info_head_message_timestamp,
3131
info_backing_queue_version,
32+
info_oldest_message_received_timestamp,
3233
unknown_info_key,
3334
matching,
3435
purge,
@@ -415,6 +416,53 @@ info_backing_queue_version(Config) ->
415416
passed
416417
end.
417418

419+
info_oldest_message_received_timestamp(Config) ->
420+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
421+
?MODULE, info_oldest_message_received_timestamp1, [Config]).
422+
423+
info_oldest_message_received_timestamp1(_Config) ->
424+
QName = rabbit_misc:r(<<"/">>, queue,
425+
<<"info_oldest_message_received_timestamp-queue">>),
426+
ExName = rabbit_misc:r(<<"/">>, exchange, <<>>),
427+
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
428+
Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
429+
PQ = rabbit_priority_queue,
430+
BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
431+
%% The queue is empty: no timestamp.
432+
true = PQ:is_empty(BQS1),
433+
'' = PQ:info(oldest_message_received_timestamp, BQS1),
434+
%% Publish one message.
435+
Content1 = #content{properties = #'P_basic'{priority = 1},
436+
payload_fragments_rev = []},
437+
{ok, Msg1} = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
438+
BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(),
439+
noflow, BQS1),
440+
Ts1 = PQ:info(oldest_message_received_timestamp, BQS2),
441+
?assert(is_integer(Ts1)),
442+
%% Publish a higher priority message.
443+
Content2 = #content{properties = #'P_basic'{priority = 2},
444+
payload_fragments_rev = []},
445+
{ok, Msg2} = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
446+
BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(),
447+
noflow, BQS2),
448+
%% Even though is highest priority, the lower priority message is older.
449+
%% Timestamp hasn't changed.
450+
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS3)),
451+
%% Consume message.
452+
{{Msg2, _, _}, BQS4} = PQ:fetch(false, BQS3),
453+
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS4)),
454+
%% Consume the first message, but do not acknowledge it
455+
%% yet. The goal is to verify that the unacknowledged message's
456+
%% timestamp is returned.
457+
{{Msg1, _, AckTag}, BQS5} = PQ:fetch(true, BQS4),
458+
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS5)),
459+
%% Ack message. The queue is empty now.
460+
{[<<"msg1">>], BQS6} = PQ:ack([AckTag], BQS5),
461+
true = PQ:is_empty(BQS6),
462+
?assertEqual('', PQ:info(oldest_message_received_timestamp, BQS6)),
463+
PQ:delete_and_terminate(a_whim, BQS6),
464+
passed.
465+
418466
unknown_info_key(Config) ->
419467
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
420468
Q = <<"info-priority-queue">>,

0 commit comments

Comments
 (0)