Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions src/linux/backend-v4l2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,31 @@ namespace librealsense
{
namespace platform
{
std::recursive_mutex named_mutex::_init_mutex;
std::map<std::string, std::recursive_mutex> named_mutex::_dev_mutex;
std::map<std::string, int> named_mutex::_dev_mutex_cnt;

// named_mutex usage note:
// ----------------------
// acquire() (or the encapsulating lock() ) function throws exception if fails.
// It increase its counters count non the less.
// Therefor, a lock() must be surrounded by try-catch section and a call to
// release() (or unlock() or destructor) must be performed, in case of failure,
// to decrease the counters count.
//
named_mutex::named_mutex(const std::string& device_path, unsigned timeout)
: _device_path(device_path),
_timeout(timeout), // TODO: try to lock with timeout
_fildes(-1)
_fildes(-1),
_object_lock_counter(0)
{
_init_mutex.lock();
_dev_mutex[_device_path]; // insert a mutex for _device_path
if (_dev_mutex_cnt.find(_device_path) == _dev_mutex_cnt.end())
{
_dev_mutex_cnt[_device_path] = 0;
}
_init_mutex.unlock();
}

named_mutex::~named_mutex()
Expand Down Expand Up @@ -156,32 +176,59 @@ namespace librealsense

void named_mutex::acquire()
{
if (-1 == _fildes)
_dev_mutex[_device_path].lock();
_dev_mutex_cnt[_device_path] += 1; //Advance counters even if throws because catch calls release()
_object_lock_counter += 1;
if (_dev_mutex_cnt[_device_path] == 1)
{
_fildes = open(_device_path.c_str(), O_RDWR, 0); //TODO: check
if(0 > _fildes)
throw linux_backend_exception(to_string() << "Cannot open '" << _device_path);
}
if (-1 == _fildes)
{
_fildes = open(_device_path.c_str(), O_RDWR, 0); //TODO: check
if(0 > _fildes)
throw linux_backend_exception(to_string() << "Cannot open '" << _device_path);
}

auto ret = lockf(_fildes, F_LOCK, 0);
if (0 != ret)
throw linux_backend_exception(to_string() << "Acquire failed");
auto ret = lockf(_fildes, F_LOCK, 0);
if (0 != ret)
throw linux_backend_exception(to_string() << "Acquire failed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will _dev_mutex[_device_path].unlock(); be called in case of exception?

}
}

void named_mutex::release()
{
if (-1 == _fildes)
_object_lock_counter -= 1;
if (_object_lock_counter < 0)
{
_object_lock_counter = 0;
return;
}
_dev_mutex_cnt[_device_path] -= 1;
std::string err_msg;
if (_dev_mutex_cnt[_device_path] < 0)
{
_dev_mutex_cnt[_device_path] = 0;
throw linux_backend_exception(to_string() << "Error: _dev_mutex_cnt[" << _device_path << "] < 0");
}

auto ret = lockf(_fildes, F_ULOCK, 0);
if (0 != ret)
throw linux_backend_exception(to_string() << "lockf(...) failed");

ret = close(_fildes);
if (0 != ret)
throw linux_backend_exception(to_string() << "close(...) failed");
if ((_dev_mutex_cnt[_device_path] == 0) && (-1 != _fildes))
{
auto ret = lockf(_fildes, F_ULOCK, 0);
if (0 != ret)
err_msg = to_string() << "lockf(...) failed";
else
{
ret = close(_fildes);
if (0 != ret)
err_msg = to_string() << "close(...) failed";
else
_fildes = -1;
}
}
_dev_mutex[_device_path].unlock();

_fildes = -1;
if (!err_msg.empty())
throw linux_backend_exception(err_msg);

}

static int xioctl(int fh, unsigned long request, void *arg)
Expand Down
5 changes: 4 additions & 1 deletion src/linux/backend-v4l2.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ namespace librealsense
std::string _device_path;
uint32_t _timeout;
int _fildes;
static std::recursive_mutex _init_mutex;
static std::map<std::string, std::recursive_mutex> _dev_mutex;
static std::map<std::string, int> _dev_mutex_cnt;
int _object_lock_counter;
std::mutex _mutex;
};

static int xioctl(int fh, unsigned long request, void *arg);

class buffer
Expand Down
1 change: 1 addition & 0 deletions unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set (unit_tests_sources
unit-tests-main.cpp
unit-tests-common.h
unit-tests-post-processing-from-bag.cpp
unit-test-long.cpp
catch.h
approx.h
)
Expand Down
5 changes: 5 additions & 0 deletions unit-tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set (INTERNAL_TESTS_SOURCES
internal-tests-types.cpp
internal-tests-uv-map.cpp
internal-tests-class-logic.cpp
internal-tests-linux.cpp
../catch.h
../approx.h
)
Expand All @@ -21,3 +22,7 @@ set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 11)
target_link_libraries(${PROJECT_NAME} ${DEPENDENCIES})
include_directories(${PROJECT_NAME} ../ ../../src/)
set_target_properties (${PROJECT_NAME} PROPERTIES FOLDER "Unit-Tests")

if(UNIX)
include_directories(${LIBUSB_INC})
endif()
190 changes: 190 additions & 0 deletions unit-tests/internal/internal-tests-linux.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#include "catch/catch.hpp"
#include <thread>
#include <string>
#include <linux/backend-v4l2.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <semaphore.h>

using namespace librealsense::platform;

TEST_CASE("named_mutex_threads", "[code]")
{
class locker_test
{
std::mutex _m0, _m1;
bool _go_0, _go_1;
std::condition_variable _cv0, _cv1;
bool _actual_test;

std::string _device_path;
public:
locker_test(const std::string& device_path, bool actual_test):
_go_0(false),
_go_1(false),
_actual_test(actual_test),
_device_path(device_path)
{
};

void func_0()
{
{
std::unique_lock<std::mutex> lk(_m0);
_cv0.wait(lk, [this]{return _go_0;});
}
named_mutex mutex0(_device_path, 0);
if (_actual_test)
{
mutex0.lock();
}
{
std::unique_lock<std::mutex> lk(_m0);
_go_0 = false;
}
}
void func_1()
{
{
std::unique_lock<std::mutex> lk(_m1);
_cv1.wait(lk, [this]{return _go_1;});
}
named_mutex mutex1(_device_path, 0);
mutex1.lock();
{
std::lock_guard<std::mutex> lk_gm(_m1);
_go_1 = false;
}
_cv1.notify_all();
{
std::unique_lock<std::mutex> lk(_m1);
_cv1.wait(lk, [this]{return _go_1;});
}
}

void run_test()
{
bool test_ok(false);

std::thread t0 = std::thread([this](){func_0();});
std::thread t1 = std::thread([this](){func_1();});
// Tell Thread 1 to lock named_mutex.
{
std::lock_guard<std::mutex> lk_gm(_m1);
_go_1 = true;
}
_cv1.notify_all();
// Wait for Thread 1 to acknowledge lock.
{
std::unique_lock<std::mutex> lk(_m1);
_cv1.wait(lk, [this]{return !_go_1;});
}
// Tell Thread 0 to lock named_mutex.
{
std::lock_guard<std::mutex> lk_gm(_m0);
_go_0 = true;
}
_cv0.notify_all();
// Give Thread 2 seconds opportunity to lock.
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
{
std::lock_guard<std::mutex> lk_gm(_m0);
// test_ok if thread 0 didn't manage to change value of _go_0.
test_ok = (_go_0 == _actual_test);
}
// Tell thread 1 to finish and release named_mutex.
{
std::lock_guard<std::mutex> lk_gm(_m1);
_go_1 = true;
}
_cv1.notify_all();
t1.join();
t0.join();
REQUIRE(test_ok);
}
};
std::string device_path("./named_mutex_test");
int fid(-1);
if( access( device_path.c_str(), F_OK ) == -1 )
{
fid = open(device_path.c_str(), O_CREAT | O_RDWR, 0666);
close(fid);
}
bool actual_test;
SECTION("self-validation")
{
actual_test = false;
}
SECTION("actual-test")
{
actual_test = true;
}

locker_test _test(device_path, actual_test);
_test.run_test();
}

TEST_CASE("named_mutex_processes", "[code]")
{
std::string device_path("./named_mutex_test");
int fid(-1);
if( access( device_path.c_str(), F_OK ) == -1 )
{
fid = open(device_path.c_str(), O_CREAT | O_RDWR, 0666);
close(fid);
}

sem_unlink("test_semaphore1");
sem_t *sem1 = sem_open("test_semaphore1", O_CREAT|O_EXCL, S_IRWXU, 0);
CHECK_FALSE(sem1 == SEM_FAILED);
sem_unlink("test_semaphore2");
sem_t *sem2 = sem_open("test_semaphore2", O_CREAT|O_EXCL, S_IRWXU, 0);
CHECK_FALSE(sem2 == SEM_FAILED);
pid_t pid_0 = getpid();
pid_t pid = fork();
bool actual_test;
SECTION("self-validation")
{
actual_test = false;
}
SECTION("actual-test")
{
actual_test = true;
}
if (pid == 0) // child
{
signal(SIGTERM, [](int signum) { exit(1); });

sem_wait(sem1);
sem_post(sem2);
named_mutex mutex1(device_path, 0);
if (actual_test)
mutex1.lock();
exit(0);
}
else
{
named_mutex mutex1(device_path, 0);
mutex1.lock();
CHECK_FALSE(sem_post(sem1) < 0);
sem_wait(sem2);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
int status;
pid_t w = waitpid(pid, &status, WNOHANG);
CHECK_FALSE(w == -1);
bool child_alive(w == 0);
if (child_alive) {
int res = kill(pid,SIGTERM);
pid_t w = waitpid(pid, &status, 0);
}
if (fid > 0)
{
remove(device_path.c_str());
}
sem_unlink("test_semaphore1");
sem_close(sem1);
sem_unlink("test_semaphore2");
sem_close(sem2);
REQUIRE(child_alive == actual_test);
}
}
Loading