Skip to content

Commit 93dfb0f

Browse files
committed
QQ first draft hi/lo priority queue
1 parent c681321 commit 93dfb0f

File tree

4 files changed

+145
-88
lines changed

4 files changed

+145
-88
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
-dialyzer(no_improper_lists).
1515

1616
-include("rabbit_fifo.hrl").
17-
-include_lib("rabbit_common/include/rabbit.hrl").
1817

1918
-define(STATE, ?MODULE).
2019

@@ -739,8 +738,9 @@ apply(_Meta, Cmd, State) ->
739738
{State, ok, []}.
740739

741740
convert_v3_to_v4(#{system_time := Ts},
742-
#rabbit_fifo{messages = Messages0,
743-
consumers = Consumers0} = StateV3) ->
741+
StateV3) ->
742+
Messages0 = rabbit_fifo_v3:get_field(messages, StateV3),
743+
Consumers0 = rabbit_fifo_v3:get_field(consumers, StateV3),
744744
Messages = rabbit_fifo_q:from_lqueue(Messages0),
745745
Consumers = maps:map(
746746
fun (_CKey, #consumer{checked_out = Ch0} = C) ->
@@ -750,8 +750,23 @@ convert_v3_to_v4(#{system_time := Ts},
750750
end, Ch0),
751751
C#consumer{checked_out = Ch}
752752
end, Consumers0),
753-
StateV3#?MODULE{messages = Messages,
754-
consumers = Consumers}.
753+
#?MODULE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
754+
messages = Messages,
755+
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
756+
returns = rabbit_fifo_v3:get_field(returns, StateV3),
757+
enqueue_count = rabbit_fifo_v3:get_field(enqueue_count, StateV3),
758+
enqueuers = rabbit_fifo_v3:get_field(enqueuers, StateV3),
759+
ra_indexes = rabbit_fifo_v3:get_field(ra_indexes, StateV3),
760+
release_cursors = rabbit_fifo_v3:get_field(release_cursors, StateV3),
761+
consumers = Consumers,
762+
% consumers that require further service are queued here
763+
service_queue = rabbit_fifo_v3:get_field(service_queue, StateV3),
764+
dlx = rabbit_fifo_v3:get_field(dlx, StateV3),
765+
msg_bytes_enqueue = rabbit_fifo_v3:get_field(msg_bytes_enqueue, StateV3),
766+
msg_bytes_checkout = rabbit_fifo_v3:get_field(msg_bytes_checkout, StateV3),
767+
waiting_consumers = rabbit_fifo_v3:get_field(waiting_consumers, StateV3),
768+
last_active = rabbit_fifo_v3:get_field(last_active, StateV3),
769+
msg_cache = rabbit_fifo_v3:get_field(msg_cache, StateV3)}.
755770

756771
purge_node(Meta, Node, State, Effects) ->
757772
lists:foldl(fun(Pid, {S0, E0}) ->
@@ -1605,19 +1620,6 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16051620
{State0, Effects}
16061621
end.
16071622

1608-
maybe_set_msg_ttl(#basic_message{content = #content{properties = none}},
1609-
RaCmdTs, Header,
1610-
#?STATE{cfg = #cfg{msg_ttl = PerQueueMsgTTL}}) ->
1611-
update_expiry_header(RaCmdTs, PerQueueMsgTTL, Header);
1612-
maybe_set_msg_ttl(#basic_message{content = #content{properties = Props}},
1613-
RaCmdTs, Header,
1614-
#?STATE{cfg = #cfg{msg_ttl = PerQueueMsgTTL}}) ->
1615-
%% rabbit_quorum_queue will leave the properties decoded if and only if
1616-
%% per message message TTL is set.
1617-
%% We already check in the channel that expiration must be valid.
1618-
{ok, PerMsgMsgTTL} = rabbit_basic:parse_expiration(Props),
1619-
TTL = min(PerMsgMsgTTL, PerQueueMsgTTL),
1620-
update_expiry_header(RaCmdTs, TTL, Header);
16211623
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16221624
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16231625
case mc:is(Msg) of
@@ -1832,10 +1834,10 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
18321834
#?STATE{cfg = Cfg,
18331835
release_cursors = Cursors0} = State0,
18341836
Effects) ->
1835-
Total = messages_total(State0),
1837+
% Total = messages_total(State0),
18361838
%% TODO: optimise
18371839
case smallest_raft_index(State0) of
1838-
undefined when Total == 0 ->
1840+
undefined ->
18391841
% there are no messages on queue anymore and no pending enqueues
18401842
% we can forward release_cursor all the way until
18411843
% the last received command, hooray
@@ -1846,8 +1848,8 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
18461848
release_cursors = lqueue:new(),
18471849
enqueue_count = 0},
18481850
{State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
1849-
undefined ->
1850-
{State0, Reply, Effects};
1851+
% undefined ->
1852+
% {State0, Reply, Effects};
18511853
Smallest when is_integer(Smallest) ->
18521854
case find_next_cursor(Smallest, Cursors0) of
18531855
empty ->
@@ -2077,7 +2079,7 @@ take_next_msg(#?STATE{returns = Returns0,
20772079
case rabbit_fifo_q:out(Messages0) of
20782080
{empty, _} ->
20792081
empty;
2080-
{{value, ?MSG(RaftIdx, _) = Msg}, Messages} ->
2082+
{_P, ?MSG(RaftIdx, _) = Msg, Messages} ->
20812083
%% add index here
20822084
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
20832085
{Msg, State#?STATE{messages = Messages,
@@ -2418,7 +2420,8 @@ normalize(#?STATE{ra_indexes = _Indexes,
24182420
release_cursors = Cursors,
24192421
dlx = DlxState} = State) ->
24202422
State#?STATE{returns = lqueue:from_list(lqueue:to_list(Returns)),
2421-
messages = rabbit_fifo_q:from_lqueue(Messages),
2423+
messages = rabbit_fifo_q:normalize(Messages,
2424+
rabbit_fifo_q:new()),
24222425
release_cursors = lqueue:from_list(lqueue:to_list(Cursors)),
24232426
dlx = rabbit_fifo_dlx:normalize(DlxState)}.
24242427

@@ -2528,9 +2531,6 @@ add_bytes_return(Header,
25282531
State#?STATE{msg_bytes_checkout = Checkout - Size,
25292532
msg_bytes_enqueue = Enqueue + Size}.
25302533

2531-
message_size(#basic_message{content = Content}) ->
2532-
#content{payload_fragments_rev = PFR} = Content,
2533-
iolist_size(PFR);
25342534
message_size(B) when is_binary(B) ->
25352535
byte_size(B);
25362536
message_size(Msg) ->
@@ -2652,12 +2652,7 @@ smallest_raft_index(#?STATE{messages = Messages,
26522652
ra_indexes = Indexes,
26532653
dlx = DlxState}) ->
26542654
SmallestDlxRaIdx = rabbit_fifo_dlx:smallest_raft_index(DlxState),
2655-
SmallestMsgsRaIdx = case rabbit_fifo_q:get(Messages) of
2656-
?MSG(I, _) when is_integer(I) ->
2657-
I;
2658-
_ ->
2659-
undefined
2660-
end,
2655+
SmallestMsgsRaIdx = rabbit_fifo_q:get_lowest_index(Messages),
26612656
SmallestRaIdx = rabbit_fifo_index:smallest(Indexes),
26622657
lists:min([SmallestDlxRaIdx, SmallestMsgsRaIdx, SmallestRaIdx]).
26632658

deps/rabbit/src/rabbit_fifo_q.erl

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
-module(rabbit_fifo_q).
22

3+
-include("rabbit_fifo.hrl").
34
-export([
45
new/0,
56
in/3,
67
out/1,
78
get/1,
89
len/1,
9-
from_lqueue/1
10+
from_lqueue/1,
11+
normalize/2,
12+
get_lowest_index/1
1013
]).
1114

1215
-define(WEIGHT, 2).
@@ -26,14 +29,16 @@
2629
new() ->
2730
#?MODULE{}.
2831

29-
-spec in(hi | lo, term(), state()) -> state().
32+
-spec in(hi | lo, msg(), state()) -> state().
3033
in(hi, Item, #?MODULE{hi = Hi, len = Len} = State) ->
3134
State#?MODULE{hi = queue:in(Item, Hi),
3235
len = Len + 1};
3336
in(lo, Item, #?MODULE{lo = Lo, len = Len} = State) ->
3437
State#?MODULE{lo = queue:in(Item, Lo),
3538
len = Len + 1}.
3639

40+
-spec out(state()) ->
41+
{empty, state()} | {hi | lo, msg(), state()}.
3742
out(#?MODULE{len = 0} = S) ->
3843
{empty, S};
3944
out(#?MODULE{hi = Hi0,
@@ -45,29 +50,30 @@ out(#?MODULE{hi = Hi0,
4550
%% try lo before hi
4651
case queue:out(Lo0) of
4752
{empty, _} ->
48-
{{value, _} = Ret, Hi} = queue:out(Hi0),
49-
{Ret, State#?MODULE{hi = Hi,
50-
dequeue_counter = 0,
51-
len = Len - 1}};
52-
{Ret, Lo} ->
53-
{Ret, State#?MODULE{lo = Lo,
54-
dequeue_counter = 0,
55-
len = Len - 1}}
53+
{{value, Ret}, Hi} = queue:out(Hi0),
54+
{hi, Ret, State#?MODULE{hi = Hi,
55+
dequeue_counter = 0,
56+
len = Len - 1}};
57+
{{value, Ret}, Lo} ->
58+
{lo, Ret, State#?MODULE{lo = Lo,
59+
dequeue_counter = 0,
60+
len = Len - 1}}
5661
end;
5762
false ->
5863
case queue:out(Hi0) of
5964
{empty, _} ->
60-
{{value, _} = Ret, Lo} = queue:out(Lo0),
61-
{Ret, State#?MODULE{lo = Lo,
62-
dequeue_counter = C + 1,
63-
len = Len - 1}};
64-
{Ret, Hi} ->
65-
{Ret, State#?MODULE{hi = Hi,
66-
dequeue_counter = C + 1,
67-
len = Len - 1}}
65+
{{value, Ret}, Lo} = queue:out(Lo0),
66+
{lo, Ret, State#?MODULE{lo = Lo,
67+
dequeue_counter = C + 1,
68+
len = Len - 1}};
69+
{{value, Ret}, Hi} ->
70+
{hi, Ret, State#?MODULE{hi = Hi,
71+
dequeue_counter = C + 1,
72+
len = Len - 1}}
6873
end
6974
end.
7075

76+
-spec get(state()) -> empty | msg().
7177
get(#?MODULE{len = 0}) ->
7278
empty;
7379
get(#?MODULE{hi = Hi0,
@@ -78,27 +84,56 @@ get(#?MODULE{hi = Hi0,
7884
%% try lo before hi
7985
case queue:peek(Lo0) of
8086
empty ->
81-
queue:peek(Hi0);
82-
{value, _} = Ret ->
87+
{value, Ret} = queue:peek(Hi0),
88+
Ret;
89+
{value, Ret} ->
8390
Ret
8491
end;
8592
false ->
8693
case queue:peek(Hi0) of
8794
empty ->
88-
queue:peek(Lo0);
89-
{value, _} = Ret ->
95+
{value, Ret} = queue:peek(Lo0),
96+
Ret;
97+
{value, Ret} ->
9098
Ret
9199
end
92100
end.
93101

102+
-spec len(state()) -> non_neg_integer().
94103
len(#?MODULE{len = Len}) ->
95104
Len.
96105

106+
-spec from_lqueue(lqueue:lqueue(msg())) -> state().
97107
from_lqueue(LQ) ->
98-
lqueue:fold(
99-
fun (Item, Acc) ->
100-
in(lo, Item, Acc)
101-
end, new(), LQ).
108+
lqueue:fold(fun (Item, Acc) ->
109+
in(lo, Item, Acc)
110+
end, new(), LQ).
111+
112+
-spec normalize(state(), state()) -> state().
113+
normalize(Q0, Acc) ->
114+
case out(Q0) of
115+
{empty, _} ->
116+
Acc;
117+
{P, Msg, Q} ->
118+
normalize(Q, in(P, Msg, Acc))
119+
end.
120+
121+
-spec get_lowest_index(state()) -> undefined | ra:index().
122+
get_lowest_index(#?MODULE{len = 0}) ->
123+
undefined;
124+
get_lowest_index(#?MODULE{hi = Hi, lo = Lo}) ->
125+
case queue:peek(Hi) of
126+
empty ->
127+
{value, ?MSG(LoIdx, _)} = queue:peek(Lo),
128+
LoIdx;
129+
{value, ?MSG(HiIdx, _)} ->
130+
case queue:peek(Lo) of
131+
{value, ?MSG(LoIdx, _)} ->
132+
max(HiIdx, LoIdx);
133+
empty ->
134+
HiIdx
135+
end
136+
end.
102137

103138
%% internals
104139

deps/rabbit/src/rabbit_fifo_v3.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@
7070
make_update_config/1,
7171
make_garbage_collection/0,
7272
convert_v1_to_v2/1,
73-
convert_v2_to_v3/1
73+
convert_v2_to_v3/1,
74+
75+
get_field/2
7476
]).
7577

7678
-ifdef(TEST).
@@ -766,6 +768,21 @@ convert_v2_to_v3(#rabbit_fifo{consumers = ConsumersV2} = StateV2) ->
766768
end, ConsumersV2),
767769
StateV2#rabbit_fifo{consumers = ConsumersV3}.
768770

771+
get_field(Field, State) ->
772+
Fields = record_info(fields, ?STATE),
773+
Index = record_index_of(Field, Fields),
774+
element(Index, State).
775+
776+
record_index_of(F, Fields) ->
777+
index_of(2, F, Fields).
778+
779+
index_of(_, F, []) ->
780+
exit({field_not_found, F});
781+
index_of(N, F, [F | _]) ->
782+
N;
783+
index_of(N, F, [_ | T]) ->
784+
index_of(N+1, F, T).
785+
769786
convert_consumer_v2_to_v3(C = #consumer{cfg = Cfg = #consumer_cfg{credit_mode = simple_prefetch,
770787
meta = #{prefetch := Prefetch}}}) ->
771788
C#consumer{cfg = Cfg#consumer_cfg{credit_mode = {simple_prefetch, Prefetch}}};

0 commit comments

Comments
 (0)