Skip to content

Commit d651f87

Browse files
committed
Share tests between MQTT and Web MQTT
New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that are executed against both MQTT and Web MQTT. This has two major advantages: 1. Eliminates test code duplication between rabbitmq_mqtt and rabbitmq_web_mqtt making the tests easier to maintain and to understand. 2. Increases test coverage of Web MQTT. It's acceptable to add a **test** dependency from rabbitmq_mqtt to rabbitmq_web_mqtt. Obviously, there should be no such dependency for non-test code.
1 parent 7c1aa49 commit d651f87

16 files changed

+661
-856
lines changed

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
add_code_path_to_node/2,
4040
add_code_path_to_all_nodes/2,
41-
rpc/5, rpc/6,
41+
rpc/4, rpc/5, rpc/6,
4242
rpc_all/4, rpc_all/5,
4343

4444
start_node/2,
@@ -1558,6 +1558,9 @@ add_code_path_to_all_nodes(Config, Module) ->
15581558
|| Nodename <- Nodenames],
15591559
ok.
15601560

1561+
rpc(Config, Module, Function, Args) ->
1562+
rpc(Config, 0, Module, Function, Args).
1563+
15611564
rpc(Config, Node, Module, Function, Args)
15621565
when is_atom(Node) andalso Node =/= undefined ->
15631566
rpc(Config, Node, Module, Function, Args, infinity);

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ dialyze(
8686
broker_for_integration_suites(
8787
extra_plugins = [
8888
"//deps/rabbitmq_management:erlang_app",
89+
"//deps/rabbitmq_web_mqtt:erlang_app",
8990
],
9091
)
9192

@@ -193,6 +194,7 @@ rabbitmq_integration_suite(
193194
"@emqtt//:erlang_app",
194195
],
195196
additional_beam = [
197+
":event_recorder",
196198
":util",
197199
],
198200
)
@@ -226,16 +228,18 @@ rabbitmq_integration_suite(
226228
)
227229

228230
rabbitmq_integration_suite(
229-
name = "integration_SUITE",
230-
size = "large",
231-
runtime_deps = [
232-
"@emqtt//:erlang_app",
233-
"//deps/rabbitmq_management_agent:erlang_app",
234-
],
235-
additional_beam = [
236-
":event_recorder",
237-
":util",
238-
],
231+
name = "shared_SUITE",
232+
size = "large",
233+
timeout = "eternal",
234+
runtime_deps = [
235+
"@emqtt//:erlang_app",
236+
"//deps/rabbitmq_management_agent:erlang_app",
237+
"@gun//:erlang_app",
238+
],
239+
additional_beam = [
240+
":event_recorder",
241+
":util",
242+
],
239243
)
240244

241245
assert_suites()

deps/rabbitmq_mqtt/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ BUILD_WITHOUT_QUIC=1
4040
export BUILD_WITHOUT_QUIC
4141

4242
DEPS = ranch rabbit_common rabbit amqp_client ra
43-
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management
43+
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt
4444

4545
dep_ct_helper = git https://github.com/extend/ct_helper.git master
4646
dep_emqtt = git https://github.com/emqx/emqtt.git 1.7.0-rc.2

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
%% quorum queues and streams whose soft limit has been exceeded
7777
soft_limit_exceeded = sets:new([{version, 2}]) :: sets:set(),
7878
qos0_messages_dropped = 0 :: non_neg_integer()
79-
}).
79+
}).
8080

8181
-opaque state() :: #state{}.
8282

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,14 +412,14 @@ run_socket(State = #state{ socket = Sock }) ->
412412
rabbit_net:setopts(Sock, [{active, once}]),
413413
State#state{ await_recv = true }.
414414

415-
control_throttle(State = #state{connection_state = Flow,
415+
control_throttle(State = #state{connection_state = ConnState,
416416
conserve = Conserve,
417417
keepalive = KState,
418418
proc_state = PState}) ->
419419
Throttle = Conserve orelse
420420
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
421421
credit_flow:blocked(),
422-
case {Flow, Throttle} of
422+
case {ConnState, Throttle} of
423423
{running, true} ->
424424
State#state{connection_state = blocked,
425425
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};

deps/rabbitmq_mqtt/test/cluster_SUITE.erl

Lines changed: 28 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -9,54 +9,52 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("eunit/include/eunit.hrl").
12-
-import(util, [expect_publishes/2,
12+
-import(util, [expect_publishes/3,
1313
connect/3,
14-
connect/4]).
14+
connect/4,
15+
await_exit/1]).
1516

1617
-import(rabbit_ct_broker_helpers,
1718
[setup_steps/0,
1819
teardown_steps/0,
1920
get_node_config/3,
2021
rabbitmqctl/3,
21-
rpc/5,
22-
stop_node/2,
23-
drain_node/2,
24-
revive_node/2]).
22+
rpc/4,
23+
stop_node/2
24+
]).
2525

2626
-define(OPTS, [{connect_timeout, 1},
2727
{ack_timeout, 1}]).
2828

2929
all() ->
3030
[
31-
{group, cluster_size_3},
3231
{group, cluster_size_5}
3332
].
3433

3534
groups() ->
3635
[
37-
{cluster_size_3, [], [
38-
maintenance
39-
]},
40-
{cluster_size_5, [], [
41-
connection_id_tracking,
42-
connection_id_tracking_on_nodedown,
43-
connection_id_tracking_with_decommissioned_node
44-
]}
36+
{cluster_size_5, [],
37+
[
38+
connection_id_tracking,
39+
connection_id_tracking_on_nodedown,
40+
connection_id_tracking_with_decommissioned_node
41+
]}
4542
].
4643

4744
suite() ->
48-
[{timetrap, {minutes, 5}}].
45+
[{timetrap, {minutes, 3}}].
4946

5047
%% -------------------------------------------------------------------
5148
%% Testsuite setup/teardown.
5249
%% -------------------------------------------------------------------
5350

5451
merge_app_env(Config) ->
55-
rabbit_ct_helpers:merge_app_env(Config,
56-
{rabbit, [
57-
{collect_statistics, basic},
58-
{collect_statistics_interval, 100}
59-
]}).
52+
rabbit_ct_helpers:merge_app_env(
53+
Config,
54+
{rabbit, [
55+
{collect_statistics, basic},
56+
{collect_statistics_interval, 100}
57+
]}).
6058

6159
init_per_suite(Config) ->
6260
rabbit_ct_helpers:log_environment(),
@@ -65,20 +63,9 @@ init_per_suite(Config) ->
6563
end_per_suite(Config) ->
6664
rabbit_ct_helpers:run_teardown_steps(Config).
6765

68-
init_per_group(cluster_size_3, Config) ->
69-
case rabbit_ct_helpers:is_mixed_versions() of
70-
true ->
71-
{skip, "maintenance mode wrongly closes cluster-wide MQTT connections "
72-
" in RMQ < 3.11.2 and < 3.10.10"};
73-
false ->
74-
set_cluster_size(3, Config)
75-
end;
7666
init_per_group(cluster_size_5, Config) ->
77-
set_cluster_size(5, Config).
78-
79-
set_cluster_size(NodesCount, Config) ->
8067
rabbit_ct_helpers:set_config(
81-
Config, [{rmq_nodes_count, NodesCount}]).
68+
Config, [{rmq_nodes_count, 5}]).
8269

8370
end_per_group(_, Config) ->
8471
Config.
@@ -107,28 +94,6 @@ end_per_testcase(Testcase, Config) ->
10794
%% Test cases
10895
%% -------------------------------------------------------------------
10996

110-
maintenance(Config) ->
111-
C0 = connect(<<"client-0">>, Config, 0, ?OPTS),
112-
C1a = connect(<<"client-1a">>, Config, 1, ?OPTS),
113-
C1b = connect(<<"client-1b">>, Config, 1, ?OPTS),
114-
115-
timer:sleep(500),
116-
117-
ok = drain_node(Config, 2),
118-
ok = revive_node(Config, 2),
119-
timer:sleep(500),
120-
[?assert(erlang:is_process_alive(C)) || C <- [C0, C1a, C1b]],
121-
122-
process_flag(trap_exit, true),
123-
ok = drain_node(Config, 1),
124-
[await_disconnection(Pid) || Pid <- [C1a, C1b]],
125-
ok = revive_node(Config, 1),
126-
?assert(erlang:is_process_alive(C0)),
127-
128-
ok = drain_node(Config, 0),
129-
await_disconnection(C0),
130-
ok = revive_node(Config, 0).
131-
13297
%% Note about running this testsuite in a mixed-versions cluster:
13398
%% All even-numbered nodes will use the same code base when using a
13499
%% secondary Umbrella. Odd-numbered nodes might use an incompatible code
@@ -147,20 +112,20 @@ connection_id_tracking(Config) ->
147112
C1 = connect(Id, Config, 0, ?OPTS),
148113
{ok, _, _} = emqtt:subscribe(C1, <<"TopicA">>, qos0),
149114
ok = emqtt:publish(C1, <<"TopicA">>, <<"Payload">>),
150-
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
115+
ok = expect_publishes(C1, <<"TopicA">>, [<<"Payload">>]),
151116

152117
%% there's one connection
153118
assert_connection_count(Config, 4, 2, 1),
154119

155120
%% connect to the same node (A or 0)
156121
process_flag(trap_exit, true),
157122
C2 = connect(Id, Config, 0, ?OPTS),
158-
await_disconnection(C1),
123+
await_exit(C1),
159124
assert_connection_count(Config, 4, 2, 1),
160125

161126
%% connect to a different node (C or 2)
162127
C3 = connect(Id, Config, 2, ?OPTS),
163-
await_disconnection(C2),
128+
await_exit(C2),
164129
assert_connection_count(Config, 4, 2, 1),
165130
ok = emqtt:disconnect(C3).
166131

@@ -169,29 +134,29 @@ connection_id_tracking_on_nodedown(Config) ->
169134
C = connect(<<"simpleClient">>, Config, ?OPTS),
170135
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
171136
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
172-
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
137+
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
173138
assert_connection_count(Config, 4, 2, 1),
174139
process_flag(trap_exit, true),
175140
ok = stop_node(Config, Server),
176-
await_disconnection(C),
141+
await_exit(C),
177142
assert_connection_count(Config, 4, 2, 0),
178143
ok.
179144

180145
connection_id_tracking_with_decommissioned_node(Config) ->
181-
case rpc(Config, 0, rabbit_mqtt_ff, track_client_id_in_ra, []) of
146+
case rpc(Config, rabbit_mqtt_ff, track_client_id_in_ra, []) of
182147
false ->
183148
{skip, "This test requires client ID tracking in Ra"};
184149
true ->
185150
Server = get_node_config(Config, 0, nodename),
186151
C = connect(<<"simpleClient">>, Config, ?OPTS),
187152
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
188153
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
189-
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
154+
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
190155

191156
assert_connection_count(Config, 4, 2, 1),
192157
process_flag(trap_exit, true),
193158
{ok, _} = rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
194-
await_disconnection(C),
159+
await_exit(C),
195160
assert_connection_count(Config, 4, 2, 0),
196161
ok
197162
end.
@@ -211,10 +176,3 @@ assert_connection_count(Config, Retries, NodeId, NumElements) ->
211176
timer:sleep(500),
212177
assert_connection_count(Config, Retries-1, NodeId, NumElements)
213178
end.
214-
215-
await_disconnection(Client) ->
216-
receive
217-
{'EXIT', Client, _} -> ok
218-
after
219-
20_000 -> ct:fail({missing_exit, Client})
220-
end.

deps/rabbitmq_mqtt/test/event_recorder.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
init(_) ->
1616
{ok, ?INIT_STATE}.
1717

18-
handle_event(#event{type = node_stats}, State) ->
19-
{ok, State};
20-
handle_event(#event{type = node_node_stats}, State) ->
18+
handle_event(#event{type = T}, State)
19+
when T =:= node_stats orelse
20+
T =:= node_node_stats orelse
21+
T =:= node_node_deleted ->
2122
{ok, State};
2223
handle_event(Event, State) ->
2324
{ok, [Event | State]}.

deps/rabbitmq_mqtt/test/ff_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
-import(rabbit_ct_broker_helpers, [rpc/5]).
1515
-import(rabbit_ct_helpers, [eventually/1]).
16-
-import(util, [expect_publishes/2,
16+
-import(util, [expect_publishes/3,
1717
get_global_counters/4,
1818
connect/2,
1919
connect/4]).
@@ -98,7 +98,7 @@ rabbit_mqtt_qos0_queue(Config) ->
9898
C1 = connect(ClientId, Config),
9999
{ok, _, [0]} = emqtt:subscribe(C1, Topic, qos0),
100100
ok = emqtt:publish(C1, Topic, Msg, qos0),
101-
ok = expect_publishes(Topic, [Msg]),
101+
ok = expect_publishes(C1, Topic, [Msg]),
102102
?assertEqual(1,
103103
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
104104

@@ -109,7 +109,7 @@ rabbit_mqtt_qos0_queue(Config) ->
109109
?assertEqual(1,
110110
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
111111
ok = emqtt:publish(C1, Topic, Msg, qos0),
112-
ok = expect_publishes(Topic, [Msg]),
112+
ok = expect_publishes(C1, Topic, [Msg]),
113113
?assertMatch(#{messages_delivered_total := 2,
114114
messages_delivered_consume_auto_ack_total := 2},
115115
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, rabbit_classic_queue}])),
@@ -125,7 +125,7 @@ rabbit_mqtt_qos0_queue(Config) ->
125125
?assertEqual(1,
126126
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [FeatureFlag]))),
127127
ok = emqtt:publish(C2, Topic, Msg, qos0),
128-
ok = expect_publishes(Topic, [Msg]),
128+
ok = expect_publishes(C2, Topic, [Msg]),
129129
?assertMatch(#{messages_delivered_total := 1,
130130
messages_delivered_consume_auto_ack_total := 1},
131131
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, FeatureFlag}])),

0 commit comments

Comments
 (0)