Skip to content

Commit 7f8c9d3

Browse files
Implement 'rmw_subscription_set_on_new_message_callback' (#88)
1 parent 4936431 commit 7f8c9d3

File tree

2 files changed

+69
-9
lines changed

2 files changed

+69
-9
lines changed

rmw_iceoryx_cpp/src/rmw_subscription.cpp

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <string>
1717

1818
#include "iceoryx_posh/capro/service_description.hpp"
19+
#include "iceoryx_posh/popo/listener.hpp"
1920

2021
#include "rcutils/error_handling.h"
2122

@@ -188,11 +189,49 @@ rmw_ret_t rmw_subscription_set_on_new_message_callback(
188189
rmw_event_callback_t callback,
189190
const void * user_data)
190191
{
191-
RCUTILS_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
192-
RCUTILS_CHECK_ARGUMENT_FOR_NULL(callback, RMW_RET_INVALID_ARGUMENT);
193-
RCUTILS_CHECK_ARGUMENT_FOR_NULL(user_data, RMW_RET_INVALID_ARGUMENT);
192+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
194193

195-
return RMW_RET_UNSUPPORTED;
194+
auto iceoryx_subscription = static_cast<IceoryxSubscription *>(subscription->data);
195+
if (!iceoryx_subscription) {
196+
RMW_SET_ERROR_MSG("subscription data is null");
197+
return RMW_RET_ERROR;
198+
}
199+
200+
auto iceoryx_receiver = iceoryx_subscription->iceoryx_receiver_;
201+
if (!iceoryx_receiver) {
202+
RMW_SET_ERROR_MSG("iceoryx_receiver is null");
203+
return RMW_RET_ERROR;
204+
}
205+
const std::lock_guard<std::mutex> lock(iceoryx_subscription->mutex_);
206+
rmw_ret_t ret = RMW_RET_ERROR;
207+
208+
if (callback == nullptr) {
209+
iceoryx_subscription->listener_.detachEvent(
210+
*(iceoryx_subscription->iceoryx_receiver_),
211+
iox::popo::SubscriberEvent::DATA_RECEIVED);
212+
ret = RMW_RET_OK;
213+
return ret;
214+
}
215+
216+
iceoryx_subscription->user_callback_ = callback;
217+
iceoryx_subscription->user_data_ = user_data;
218+
iceoryx_subscription->listener_
219+
.attachEvent(
220+
*(iceoryx_subscription->iceoryx_receiver_),
221+
iox::popo::SubscriberEvent::DATA_RECEIVED,
222+
iox::popo::createNotificationCallback(
223+
IceoryxSubscription::onSampleReceivedCallback,
224+
*iceoryx_subscription))
225+
.or_else(
226+
[&](auto) {
227+
RMW_SET_ERROR_MSG(
228+
"rmw_subscription_get_content_filter: Unable to attach subscriber to listener");
229+
ret = RMW_RET_ERROR;
230+
});
231+
232+
ret = RMW_RET_OK;
233+
234+
return ret;
196235
}
197236

198237
rmw_ret_t

rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
#ifndef TYPES__ICEORYX_SUBSCRIPTION_HPP_
1616
#define TYPES__ICEORYX_SUBSCRIPTION_HPP_
1717

18+
#include <mutex>
19+
1820
#include "iceoryx_posh/popo/untyped_subscriber.hpp"
21+
#include "iceoryx_posh/popo/listener.hpp"
1922

23+
#include "rmw/error_handling.h"
2024
#include "rmw/rmw.h"
2125
#include "rmw/types.h"
2226

@@ -31,12 +35,29 @@ struct IceoryxSubscription
3135
iceoryx_receiver_(iceoryx_receiver),
3236
is_fixed_size_(rmw_iceoryx_cpp::iceoryx_is_fixed_size(type_supports)),
3337
message_size_(rmw_iceoryx_cpp::iceoryx_get_message_size(type_supports))
34-
{}
38+
{
39+
}
3540

3641
rosidl_message_type_support_t type_supports_;
37-
iox::popo::UntypedSubscriber * const iceoryx_receiver_;
38-
bool is_fixed_size_;
39-
size_t message_size_;
40-
};
42+
iox::popo::UntypedSubscriber * const iceoryx_receiver_{nullptr};
43+
bool is_fixed_size_{false};
44+
size_t message_size_{0};
45+
std::mutex mutex_;
46+
/// TODO Why not having one listener for all subscriptions?
47+
iox::popo::Listener listener_;
48+
rmw_event_callback_t user_callback_{nullptr};
49+
const void * user_data_{nullptr};
4150

51+
static void onSampleReceivedCallback(iox::popo::UntypedSubscriber *, IceoryxSubscription * self)
52+
{
53+
/// TODO This lock isn't needed, the listener calls are sequential, right?
54+
const std::lock_guard<std::mutex> lock(self->mutex_);
55+
if (self == nullptr) {
56+
RMW_SET_ERROR_MSG("onSampleReceivedCallback: Invalid arguments");
57+
return;
58+
}
59+
60+
self->user_callback_(self->user_data_, 1);
61+
}
62+
};
4263
#endif // TYPES__ICEORYX_SUBSCRIPTION_HPP_

0 commit comments

Comments
 (0)