Skip to content

Commit 7c1aa49

Browse files
committed
Increase MQTT test coverage and fix edge cases
1 parent c9df098 commit 7c1aa49

File tree

8 files changed

+249
-72
lines changed

8 files changed

+249
-72
lines changed

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,11 +1461,10 @@ set_vhost_limit(Config, Node, VHost, Limit0, Value) ->
14611461
max_queues -> <<"max-queues">>;
14621462
Other -> rabbit_data_coercion:to_binary(Other)
14631463
end,
1464-
Definition = rabbit_json:encode(#{Limit => Value}),
1464+
Limits = [{Limit, Value}],
14651465
rpc(Config, Node,
1466-
rabbit_vhost_limit,
1467-
set,
1468-
[VHost, Definition, <<"ct-tests">>]).
1466+
rabbit_vhost_limit, set,
1467+
[VHost, Limits, <<"ct-tests">>]).
14691468

14701469
set_user_limits(Config, Username, Limits) ->
14711470
set_user_limits(Config, 0, Username, Limits).

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ process_request(?PUBACK,
142142
{QMsgId, U} ->
143143
case rabbit_queue_type:settle(QName, complete, ?CONSUMER_TAG, [QMsgId], QStates0) of
144144
{ok, QStates, Actions} ->
145-
%%TODO rabbit_channel:incr_queue_stats/3
146145
message_acknowledged(QName, State),
147146
{ok, handle_queue_actions(Actions, State#state{unacked_server_pubs = U,
148147
queue_states = QStates})};
@@ -349,14 +348,14 @@ process_connect(#mqtt_packet{
349348
PacketConnect, State0) of
350349
{ok, SessionPresent0, State1} ->
351350
{?CONNACK_ACCEPT, SessionPresent0, State1};
352-
{error, ReturnCode0, State1} ->
353-
{ReturnCode0, false, State1}
351+
{error, ConnectionRefusedReturnCode, State1} ->
352+
{ConnectionRefusedReturnCode, false, State1}
354353
end,
355-
ResponsePacket = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?CONNACK},
356-
variable = #mqtt_packet_connack{
357-
session_present = SessionPresent,
358-
return_code = ReturnCode}},
359-
SendFun(ResponsePacket, State),
354+
Response = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?CONNACK},
355+
variable = #mqtt_packet_connack{
356+
session_present = SessionPresent,
357+
return_code = ReturnCode}},
358+
SendFun(Response, State),
360359
return_connack(ReturnCode, State).
361360

362361
check_protocol_version(#mqtt_packet_connect{proto_ver = ProtoVersion}) ->
@@ -368,7 +367,7 @@ check_protocol_version(#mqtt_packet_connect{proto_ver = ProtoVersion}) ->
368367
end.
369368

370369
check_client_id(#mqtt_packet_connect{clean_sess = false,
371-
client_id = []}) ->
370+
client_id = <<>>}) ->
372371
{error, ?CONNACK_ID_REJECTED};
373372
check_client_id(_) ->
374373
ok.
@@ -401,12 +400,10 @@ login({UserBin, PassBin,
401400
State0) ->
402401
ClientId = ensure_client_id(ClientId0),
403402
case process_login(UserBin, PassBin, ClientId, State0) of
404-
already_connected ->
405-
{ok, already_connected};
406403
{ok, State} ->
407404
{ok, Packet, State#state{clean_sess = CleanSess,
408405
client_id = ClientId}};
409-
{error, _Reason, _State} = Err ->
406+
{error, _ConnectionRefusedReturnCode, _State} = Err ->
410407
Err
411408
end.
412409

@@ -419,8 +416,6 @@ ensure_client_id(ClientId)
419416
when is_binary(ClientId) ->
420417
ClientId.
421418

422-
register_client(already_connected, _State) ->
423-
ok;
424419
register_client(Packet = #mqtt_packet_connect{proto_ver = ProtoVersion},
425420
State = #state{client_id = ClientId,
426421
socket = Socket,
@@ -462,8 +457,6 @@ register_client(Packet = #mqtt_packet_connect{proto_ver = ProtoVersion},
462457
{ok, NewProcState(undefined)}
463458
end.
464459

465-
notify_connection_created(already_connected) ->
466-
ok;
467460
notify_connection_created(#mqtt_packet_connect{}) ->
468461
rabbit_networking:register_non_amqp_connection(self()),
469462
self() ! connection_created,
@@ -616,7 +609,7 @@ maybe_unbind(TopicName, TopicNames, QName, State0) ->
616609
{ok, _Output, State} ->
617610
{ok, State};
618611
{error, Reason, _State} = Err ->
619-
rabbit_log:error("Failed to unbind ~s with topic ~s: ~p",
612+
rabbit_log:error("Failed to unbind ~s with topic '~s': ~p",
620613
[rabbit_misc:rs(QName), TopicName, Reason]),
621614
Err
622615
end
@@ -674,17 +667,18 @@ make_will_msg(#mqtt_packet_connect{will_retain = Retain,
674667
dup = false,
675668
payload = Msg}.
676669

677-
process_login(_UserBin, _PassBin, _ClientId,
670+
process_login(_UserBin, _PassBin, ClientId,
678671
#state{peer_addr = Addr,
679672
auth_state = #auth_state{username = Username,
680673
user = User,
681674
vhost = VHost
682-
}})
675+
}} = State)
683676
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
684677
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
685-
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~ts, vhost ~ts",
686-
[Username, VHost]),
687-
already_connected;
678+
rabbit_log_connection:error(
679+
"MQTT detected duplicate connect attempt for client ID '~ts', user '~ts', vhost '~ts'",
680+
[ClientId, Username, VHost]),
681+
{error, ?CONNACK_ID_REJECTED, State};
688682
process_login(UserBin, PassBin, ClientId,
689683
#state{socket = Sock,
690684
ssl_login_name = SslLoginName,
@@ -713,7 +707,7 @@ process_login(UserBin, PassBin, ClientId,
713707
{ok, _Output, State} ->
714708
rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
715709
{ok, State};
716-
{error, _Reason, _State} = Err ->
710+
{error, _ConnectionRefusedReturnCode, _State} = Err ->
717711
rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
718712
Err
719713
end.
@@ -730,30 +724,30 @@ check_vhost_exists(#{vhost := VHost,
730724
end.
731725

732726
check_vhost_connection_limit(#{vhost := VHost,
733-
username_bin := UsernameBin}) ->
727+
client_id := ClientId,
728+
username_bin := Username}) ->
734729
case rabbit_vhost_limit:is_over_connection_limit(VHost) of
735730
false ->
736731
ok;
737732
{true, Limit} ->
738733
rabbit_log_connection:error(
739-
"Error on MQTT connection ~p~n"
740-
"access to vhost '~s' refused for user '~s': "
741-
"vhost connection limit (~p) is reached",
742-
[self(), VHost, UsernameBin, Limit]),
734+
"Failed to create MQTT connection because vhost connection limit is reached; "
735+
"vhost: '~s'; connection limit: ~p; user: '~s'; client ID '~s'",
736+
[VHost, Limit, Username, ClientId]),
743737
{error, ?CONNACK_NOT_AUTHORIZED}
744738
end.
745739

746740
check_vhost_alive(#{vhost := VHost,
741+
client_id := ClientId,
747742
username_bin := UsernameBin}) ->
748743
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
749744
true ->
750745
ok;
751746
false ->
752747
rabbit_log_connection:error(
753-
"Error on MQTT connection ~p~n"
754-
"access refused for user '~s': "
755-
"vhost is down",
756-
[self(), UsernameBin, VHost]),
748+
"Failed to create MQTT connection because vhost is down; "
749+
"vhost: ~s; user: ~s; client ID: ~s",
750+
[VHost, UsernameBin, ClientId]),
757751
{error, ?CONNACK_NOT_AUTHORIZED}
758752
end.
759753

@@ -798,16 +792,16 @@ notify_auth_result(AuthResult, Username, #state{conn_name = ConnName}) ->
798792
{connection_name, ConnName},
799793
{connection_type, network}]).
800794

801-
check_user_connection_limit(#{user := #user{username = Username}}) ->
795+
check_user_connection_limit(#{user := #user{username = Username},
796+
client_id := ClientId}) ->
802797
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
803798
false ->
804799
ok;
805800
{true, Limit} ->
806801
rabbit_log_connection:error(
807-
"Error on MQTT connection ~p~n"
808-
"access refused for user '~s': "
809-
"user connection limit (~p) is reached",
810-
[self(), Username, Limit]),
802+
"Failed to create MQTT connection because user connection limit is reached; "
803+
"user: '~s'; connection limit: ~p; client ID '~s'",
804+
[Username, Limit, ClientId]),
811805
{error, ?CONNACK_NOT_AUTHORIZED}
812806
end.
813807

@@ -1230,8 +1224,6 @@ deliver_to_queues(Delivery,
12301224
RoutedToQNames,
12311225
State0 = #state{queue_states = QStates0,
12321226
proto_ver = ProtoVer}) ->
1233-
%% TODO only lookup fields that are needed using ets:select / match?
1234-
%% TODO Use ETS continuations to be more space efficient
12351227
Qs0 = rabbit_amqqueue:lookup(RoutedToQNames),
12361228
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
12371229
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
@@ -1588,7 +1580,7 @@ maybe_publish_to_client(
15881580
routing_keys = [RoutingKey | _CcRoutes],
15891581
content = #content{payload_fragments_rev = FragmentsRev}}},
15901582
QoS, State0 = #state{send_fun = SendFun}) ->
1591-
{PacketId, State} = queue_packet_id_to_packet_id(QMsgId, QoS, State0),
1583+
{PacketId, State} = msg_id_to_packet_id(QMsgId, QoS, State0),
15921584
Packet =
15931585
#mqtt_packet{
15941586
fixed = #mqtt_packet_fixed{
@@ -1609,11 +1601,11 @@ maybe_publish_to_client(
16091601
message_delivered(QNameOrType, Redelivered, QoS, State),
16101602
State.
16111603

1612-
queue_packet_id_to_packet_id(_, ?QOS_0, State) ->
1604+
msg_id_to_packet_id(_, ?QOS_0, State) ->
16131605
%% "A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0 [MQTT-2.2.1-2]."
16141606
{undefined, State};
1615-
queue_packet_id_to_packet_id(QMsgId, ?QOS_1, #state{packet_id = PktId,
1616-
unacked_server_pubs = U} = State) ->
1607+
msg_id_to_packet_id(QMsgId, ?QOS_1, #state{packet_id = PktId,
1608+
unacked_server_pubs = U} = State) ->
16171609
{PktId, State#state{packet_id = increment_packet_id(PktId),
16181610
unacked_server_pubs = maps:put(PktId, QMsgId, U)}}.
16191611

@@ -1626,14 +1618,8 @@ increment_packet_id(Id) ->
16261618

16271619
maybe_auto_ack(_AckRequired = true, ?QOS_0, QName, QMsgId,
16281620
State = #state{queue_states = QStates0}) ->
1629-
case rabbit_queue_type:settle(QName, complete, ?CONSUMER_TAG, [QMsgId], QStates0) of
1630-
{ok, QStates, Actions} ->
1631-
%%TODO rabbit_channel:incr_queue_stats/3
1632-
handle_queue_actions(Actions, State#state{queue_states = QStates});
1633-
{protocol_error, _ErrorType, _Reason, _ReasonArgs} = Err ->
1634-
%%TODO handle error
1635-
throw(Err)
1636-
end;
1621+
{ok, QStates, Actions} = rabbit_queue_type:settle(QName, complete, ?CONSUMER_TAG, [QMsgId], QStates0),
1622+
handle_queue_actions(Actions, State#state{queue_states = QStates});
16371623
maybe_auto_ack(_, _, _, _, State) ->
16381624
State.
16391625

deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ queue_name_bin(ClientId, QoS) ->
3737
queue_name0(Prefix, QoS).
3838

3939
queue_name0(Prefix, ?QOS_0) ->
40-
%%TODO consider shortening the QoS0 queue name to save memory
41-
%%(can't change QoS1 name to not break rolling updates)
4240
<<Prefix/binary, "0">>;
4341
queue_name0(Prefix, ?QOS_1) ->
4442
<<Prefix/binary, "1">>.

0 commit comments

Comments
 (0)