Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/Dockerfile-ubuntu-24.04
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ RUN apt-get update \
clang-19 \
clang-format-19 \
libclang-rt-19-dev \
llvm-19 \
cmake \
python3 \
python3-pip \
Expand Down
60 changes: 41 additions & 19 deletions src/mavsdk/core/tcp_client_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ ConnectionResult TcpClientConnection::setup_port()
}
#endif

_socket_fd.reset(socket(AF_INET, SOCK_STREAM, 0));
std::lock_guard<std::mutex> lock(_mutex);

if (_socket_fd.empty()) {
int new_fd = socket(AF_INET, SOCK_STREAM, 0);

if (new_fd < 0) {
LogErr() << "socket error" << strerror(errno);
return ConnectionResult::SocketError;
}
Expand All @@ -90,34 +92,31 @@ ConnectionResult TcpClientConnection::setup_port()
hp = gethostbyname(_remote_ip.c_str());
if (hp == nullptr) {
LogErr() << "Could not get host by name";
_socket_fd.close();
return ConnectionResult::SocketConnectionError;
}

memcpy(&remote_addr.sin_addr, hp->h_addr, hp->h_length);

if (connect(
_socket_fd.get(),
reinterpret_cast<sockaddr*>(&remote_addr),
sizeof(struct sockaddr_in)) < 0) {
if (connect(new_fd, reinterpret_cast<sockaddr*>(&remote_addr), sizeof(struct sockaddr_in)) <
0) {
LogErr() << "Connect error: " << strerror(errno);
_socket_fd.close();
return ConnectionResult::SocketConnectionError;
}

// Set receive timeout cross-platform
const unsigned timeout_ms = 500;

#if defined(WINDOWS)
setsockopt(
_socket_fd.get(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
setsockopt(_socket_fd.get(), SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
#endif

_socket_fd.reset(new_fd);

return ConnectionResult::Success;
}

Expand All @@ -135,7 +134,10 @@ ConnectionResult TcpClientConnection::stop()
_recv_thread.reset();
}

_socket_fd.close();
{
std::lock_guard<std::mutex> lock(_mutex);
_socket_fd.close();
}

// We need to stop this after stopping the receive thread, otherwise
// it can happen that we interfere with the parsing of a message.
Expand All @@ -155,10 +157,6 @@ std::pair<bool, std::string> TcpClientConnection::send_message(const mavlink_mes

std::pair<bool, std::string> TcpClientConnection::send_raw_bytes(const char* bytes, size_t length)
{
if (_socket_fd.empty()) {
return std::make_pair(false, "Not connected");
}

std::pair<bool, std::string> result;

if (_remote_ip.empty()) {
Expand All @@ -175,13 +173,25 @@ std::pair<bool, std::string> TcpClientConnection::send_raw_bytes(const char* byt
return result;
}

// Get socket fd with mutex protection, then release mutex before blocking send
SocketHolder::DescriptorType socket_fd;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_socket_fd.empty()) {
result.first = false;
result.second = "Not connected";
return result;
}
socket_fd = _socket_fd.get();
}

#if !defined(MSG_NOSIGNAL)
auto flags = 0;
#else
auto flags = MSG_NOSIGNAL;
#endif

const auto send_len = send(_socket_fd.get(), bytes, length, flags);
const auto send_len = send(socket_fd, bytes, length, flags);

if (send_len != static_cast<std::remove_cv_t<decltype(send_len)>>(length)) {
std::stringstream ss;
Expand All @@ -202,13 +212,25 @@ void TcpClientConnection::receive()
char buffer[2048];

while (!_should_exit) {
if (_socket_fd.empty()) {
// Get socket fd with mutex protection, then release mutex before blocking recv
SocketHolder::DescriptorType socket_fd;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_socket_fd.empty()) {
// Socket not connected, need to reconnect
socket_fd = SocketHolder::invalid_socket_fd;
} else {
socket_fd = _socket_fd.get();
}
}

if (socket_fd == SocketHolder::invalid_socket_fd) {
std::this_thread::sleep_for(std::chrono::seconds(1));
setup_port();
continue;
}

const auto recv_len = recv(_socket_fd.get(), buffer, sizeof(buffer), 0);
const auto recv_len = recv(socket_fd, buffer, sizeof(buffer), 0);

if (recv_len == 0) {
// Connection closed, just try again.
Expand Down
124 changes: 81 additions & 43 deletions src/mavsdk/core/tcp_server_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,22 @@ ConnectionResult TcpServerConnection::start()
}
#endif

std::lock_guard<std::mutex> lock(_mutex);

_server_socket_fd.reset(socket(AF_INET, SOCK_STREAM, 0));
if (_server_socket_fd.empty()) {
LogErr() << "socket error: " << strerror(errno);
return ConnectionResult::SocketError;
}

// Allow reuse of address to avoid "Address already in use" errors
int yes = 1;
#ifdef WINDOWS
setsockopt(_server_socket_fd.get(), SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
#else
setsockopt(_server_socket_fd.get(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
#endif

sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
Expand All @@ -85,30 +95,6 @@ ConnectionResult TcpServerConnection::start()
return ConnectionResult::SocketError;
}

// Set receive timeout cross-platform
const unsigned timeout_ms = 500;

#if defined(WINDOWS)
setsockopt(
_server_socket_fd.get(),
SOL_SOCKET,
SO_RCVTIMEO,
(const char*)&timeout_ms,
sizeof(timeout_ms));
setsockopt(
_client_socket_fd.get(),
SOL_SOCKET,
SO_RCVTIMEO,
(const char*)&timeout_ms,
sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
setsockopt(_server_socket_fd.get(), SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
setsockopt(_client_socket_fd.get(), SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
#endif

_accept_receive_thread =
std::make_unique<std::thread>(&TcpServerConnection::accept_client, this);

Expand All @@ -124,8 +110,11 @@ ConnectionResult TcpServerConnection::stop()
_accept_receive_thread.reset();
}

_client_socket_fd.close();
_server_socket_fd.close();
{
std::lock_guard<std::mutex> lock(_mutex);
_client_socket_fd.close();
_server_socket_fd.close();
}

// We need to stop this after stopping the receive thread, otherwise
// it can happen that we interfere with the parsing of a message.
Expand All @@ -147,31 +136,37 @@ std::pair<bool, std::string> TcpServerConnection::send_message(const mavlink_mes

void TcpServerConnection::accept_client()
{
// Get server socket fd with mutex protection for setup
SocketHolder::DescriptorType server_socket_fd;
{
std::lock_guard<std::mutex> lock(_mutex);
server_socket_fd = _server_socket_fd.get();
}

#ifdef WINDOWS
// Set server socket to non-blocking
u_long iMode = 1;
int iResult = ioctlsocket(_server_socket_fd.get(), FIONBIO, &iMode);
int iResult = ioctlsocket(server_socket_fd, FIONBIO, &iMode);
if (iResult != 0) {
LogErr() << "ioctlsocket failed with error: " << get_socket_error_string(WSAGetLastError());
}
#else
// Set server socket to non-blocking
int flags = fcntl(_server_socket_fd.get(), F_GETFL, 0);
fcntl(_server_socket_fd.get(), F_SETFL, flags | O_NONBLOCK);
int flags = fcntl(server_socket_fd, F_GETFL, 0);
fcntl(server_socket_fd, F_SETFL, flags | O_NONBLOCK);
#endif

while (!_should_exit) {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(_server_socket_fd.get(), &readfds);
FD_SET(server_socket_fd, &readfds);

// Set timeout to 1 second
timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;

const int activity =
select(_server_socket_fd.get() + 1, &readfds, nullptr, nullptr, &timeout);
const int activity = select(server_socket_fd + 1, &readfds, nullptr, nullptr, &timeout);

if (activity < 0 && errno != EINTR) {
LogErr() << "select error: " << strerror(errno);
Expand All @@ -183,24 +178,39 @@ void TcpServerConnection::accept_client()
continue;
}

if (FD_ISSET(_server_socket_fd.get(), &readfds)) {
if (FD_ISSET(server_socket_fd, &readfds)) {
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);

{
_client_socket_fd.reset(accept(
_server_socket_fd.get(),
reinterpret_cast<sockaddr*>(&client_addr),
&client_addr_len));
}
if (_client_socket_fd.empty()) {
int new_fd = accept(
server_socket_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);

if (new_fd < 0) {
if (_should_exit) {
return;
}
LogErr() << "accept error: " << strerror(errno);
continue;
}

// Set receive timeout on client socket
const unsigned timeout_ms = 500;
#if defined(WINDOWS)
setsockopt(
new_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
#endif

// Now store the new client socket with mutex protection
{
std::lock_guard<std::mutex> lock(_mutex);
_client_socket_fd.reset(new_fd);
}

receive();
}
}
Expand All @@ -210,9 +220,19 @@ void TcpServerConnection::receive()
{
std::array<char, 2048> buffer{};

// Get client socket fd with mutex protection, then release mutex before blocking recv
SocketHolder::DescriptorType client_socket_fd;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_client_socket_fd.empty()) {
return;
}
client_socket_fd = _client_socket_fd.get();
}

bool dataReceived = false;
while (!dataReceived && !_should_exit) {
const auto recv_len = recv(_client_socket_fd.get(), buffer.data(), buffer.size(), 0);
const auto recv_len = recv(client_socket_fd, buffer.data(), buffer.size(), 0);

#ifdef WINDOWS
if (recv_len == SOCKET_ERROR) {
Expand Down Expand Up @@ -249,7 +269,13 @@ void TcpServerConnection::receive()
#endif

if (recv_len == 0) {
continue;
// Client disconnected, close the socket and go back to accept new connections
LogInfo() << "TCP client disconnected, waiting for new connection...";
{
std::lock_guard<std::mutex> lock(_mutex);
_client_socket_fd.close();
}
return;
}

_mavlink_receiver->set_new_datagram(buffer.data(), static_cast<int>(recv_len));
Expand All @@ -275,13 +301,25 @@ std::pair<bool, std::string> TcpServerConnection::send_raw_bytes(const char* byt
// Basic implementation for TCP server connections
std::pair<bool, std::string> result;

// Get client socket fd with mutex protection, then release mutex before blocking send
SocketHolder::DescriptorType client_socket_fd;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_client_socket_fd.empty()) {
result.first = false;
result.second = "Not connected";
return result;
}
client_socket_fd = _client_socket_fd.get();
}

#if !defined(MSG_NOSIGNAL)
auto flags = 0;
#else
auto flags = MSG_NOSIGNAL;
#endif

const auto send_len = send(_client_socket_fd.get(), bytes, length, flags);
const auto send_len = send(client_socket_fd, bytes, length, flags);

if (send_len != static_cast<std::remove_cv_t<decltype(send_len)>>(length)) {
// Broken pipe is expected during shutdown, don't log it
Expand Down
2 changes: 2 additions & 0 deletions src/mavsdk/core/tcp_server_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "socket_holder.h"

#include <atomic>
#include <mutex>
#include <string>
#include <thread>

Expand Down Expand Up @@ -32,6 +33,7 @@ class TcpServerConnection : public Connection {
Connection::ReceiverCallback _receiver_callback;
std::string _local_ip;
int _local_port;
std::mutex _mutex{};
SocketHolder _server_socket_fd;
SocketHolder _client_socket_fd;
std::unique_ptr<std::thread> _accept_receive_thread;
Expand Down
1 change: 1 addition & 0 deletions src/system_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ add_executable(system_tests_runner
intercept.cpp
mavlink_direct.cpp
mavlink_direct_forwarding.cpp
connections.cpp
system_tests_runner.cpp
)

Expand Down
Loading