Skip to content

Commit 3432aa7

Browse files
carlossvgcarlos-apex
authored andcommitted
Refactor random talker example
Signed-off-by: Carlos San Vicente <[email protected]>
1 parent 8d519be commit 3432aa7

File tree

8 files changed

+209
-108
lines changed

8 files changed

+209
-108
lines changed

rclcpp/wait_set/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ ament_target_dependencies(wait_set_example rclcpp std_msgs)
3030
add_executable(wait_set_random_order wait_set_random_order.cpp)
3131
ament_target_dependencies(wait_set_random_order rclcpp std_msgs)
3232

33+
add_executable(executor_random_order executor_random_order.cpp)
34+
ament_target_dependencies(executor_random_order rclcpp std_msgs)
35+
3336
add_executable(wait_set_different_rates_topics wait_set_different_rates_topics.cpp)
3437
ament_target_dependencies(wait_set_different_rates_topics rclcpp std_msgs)
3538

@@ -42,6 +45,7 @@ install(TARGETS
4245
thread_safe_wait_set
4346
wait_set_example
4447
wait_set_random_order
48+
executor_random_order
4549
wait_set_different_rates_topics
4650
wait_set_composition
4751
DESTINATION lib/${PROJECT_NAME}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2021, Apex.AI Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <rclcpp/rclcpp.hpp>
16+
#include <std_msgs/msg/string.hpp>
17+
#include "random_listener.hpp"
18+
#include "random_talker.hpp"
19+
20+
/* For this example, we will be creating a talker node with three publishers which will
21+
* publish the topics A, B, C in random order each time. In this case the messages are handled
22+
* using an executor. The order in which the messages are handled will depend on the message
23+
* arrival and the type of messages available at the moment.
24+
*/
25+
26+
int32_t main(const int32_t argc, char ** const argv)
27+
{
28+
rclcpp::init(argc, argv);
29+
30+
// We run the talker and the listener in different threads to simulate inter-process
31+
// communications.
32+
// Note the order of execution would be deterministic if both nodes are spun in the same
33+
// executor (A, B, C). This is because the publishing happens always before the subscription
34+
// handling and the executor handles the messages in the order in which the subscriptions were
35+
// created. Using difference threads the handling order depends on the message arrival and
36+
// type of messages available.
37+
auto thread = std::thread([]() {rclcpp::spin(std::make_shared<RandomTalker>());});
38+
rclcpp::spin(std::make_shared<Listener>());
39+
40+
rclcpp::shutdown();
41+
thread.join();
42+
43+
return 0;
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2021, Apex.AI Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <rclcpp/rclcpp.hpp>
16+
#include <std_msgs/msg/string.hpp>
17+
#include <random>
18+
19+
class Listener : public rclcpp::Node
20+
{
21+
using subscription_list = std::vector<rclcpp::Subscription<std_msgs::msg::String>::SharedPtr>;
22+
23+
public:
24+
Listener()
25+
: Node("random_listener")
26+
{
27+
auto print_msg = [this](std_msgs::msg::String::UniquePtr msg) {
28+
RCLCPP_INFO(this->get_logger(), "I heard: %s", msg->data.c_str());
29+
};
30+
sub1_ = this->create_subscription<std_msgs::msg::String>("topicA", 10, print_msg);
31+
sub2_ = this->create_subscription<std_msgs::msg::String>("topicB", 10, print_msg);
32+
sub3_ = this->create_subscription<std_msgs::msg::String>("topicC", 10, print_msg);
33+
}
34+
35+
subscription_list get_subscriptions() const
36+
{
37+
return {sub1_, sub2_, sub3_};
38+
}
39+
40+
private:
41+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub1_;
42+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub2_;
43+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub3_;
44+
};

rclcpp/wait_set/random_talker.hpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2021, Apex.AI Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <rclcpp/rclcpp.hpp>
16+
#include <std_msgs/msg/string.hpp>
17+
#include <random>
18+
19+
using namespace std::chrono_literals;
20+
21+
class RandomTalker : public rclcpp::Node
22+
{
23+
public:
24+
RandomTalker()
25+
: Node("random_talker"),
26+
pub1_(this->create_publisher<std_msgs::msg::String>("topicA", 10)),
27+
pub2_(this->create_publisher<std_msgs::msg::String>("topicB", 10)),
28+
pub3_(this->create_publisher<std_msgs::msg::String>("topicC", 10)),
29+
rand_engine_(std::chrono::system_clock::now().time_since_epoch().count())
30+
{
31+
publish_functions_.emplace_back(
32+
([this]() {
33+
std_msgs::msg::String msg;
34+
msg.data = "A";
35+
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
36+
pub1_->publish(msg);
37+
}));
38+
publish_functions_.emplace_back(
39+
([this]() {
40+
std_msgs::msg::String msg;
41+
msg.data = "B";
42+
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
43+
pub2_->publish(msg);
44+
}));
45+
publish_functions_.emplace_back(
46+
([this]() {
47+
std_msgs::msg::String msg;
48+
msg.data = "C";
49+
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
50+
pub3_->publish(msg);
51+
}));
52+
auto timer_callback =
53+
[this]() -> void {
54+
std::shuffle(publish_functions_.begin(), publish_functions_.end(), rand_engine_);
55+
for (const auto & f : publish_functions_) {f();}
56+
};
57+
timer_ = this->create_wall_timer(1s, timer_callback);
58+
}
59+
60+
private:
61+
rclcpp::TimerBase::SharedPtr timer_;
62+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub1_;
63+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub2_;
64+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub3_;
65+
std::vector<std::function<void()>> publish_functions_;
66+
std::default_random_engine rand_engine_;
67+
};

rclcpp/wait_set/wait_set_composition.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,13 @@ int main(int argc, char * argv[])
116116
std_msgs::msg::String msg;
117117
rclcpp::MessageInfo msg_info;
118118
if (listener->get_subscription()->take(msg, msg_info)) {
119-
RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (waitset)", msg.data.c_str());
119+
RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (wait-set)", msg.data.c_str());
120120
}
121121
}
122122
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
123-
RCLCPP_ERROR(listener->get_logger(), "No message received after 5s.");
124-
} else {
125-
RCLCPP_ERROR(listener->get_logger(), "Wait-set failed.");
123+
if (rclcpp::ok()) {
124+
RCLCPP_ERROR(listener->get_logger(), "Wait-set failed with timeout");
125+
}
126126
}
127127
}
128128

rclcpp/wait_set/wait_set_different_rates_topics.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ int32_t main(const int32_t argc, char ** const argv)
5353
{
5454
rclcpp::init(argc, argv);
5555

56-
auto node = std::make_shared<rclcpp::Node>("waitset_node");
56+
auto node = std::make_shared<rclcpp::Node>("wait_set_listener");
5757
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};
5858

5959
auto sub1 = node->create_subscription<std_msgs::msg::String>("topicA", 10, do_nothing);
@@ -75,9 +75,7 @@ int32_t main(const int32_t argc, char ** const argv)
7575
auto publisher_thread = std::thread([&exec]() {exec.spin();});
7676

7777
while (rclcpp::ok()) {
78-
// Waiting up to 5s for a message to arrive
79-
const auto wait_result = wait_set.wait(std::chrono::seconds(5));
80-
78+
const auto wait_result = wait_set.wait(3s);
8179
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
8280
bool sub2_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[1U];
8381
bool sub3_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[2U];
@@ -98,7 +96,7 @@ int32_t main(const int32_t argc, char ** const argv)
9896
handled_data.append(msg1.data);
9997
}
10098
handled_data.append(msg2.data);
101-
RCLCPP_INFO(node->get_logger(), "Handle topic A and B: %s", handled_data.c_str());
99+
RCLCPP_INFO(node->get_logger(), "I heard: %s", handled_data.c_str());
102100
} else {
103101
RCLCPP_ERROR(node->get_logger(), "An invalid message from topic B was received.");
104102
}
@@ -109,15 +107,15 @@ int32_t main(const int32_t argc, char ** const argv)
109107
std_msgs::msg::String msg;
110108
rclcpp::MessageInfo msg_info;
111109
if (sub3->take(msg, msg_info)) {
112-
RCLCPP_INFO(node->get_logger(), "Handle topic C: %s", msg.data.c_str());
110+
RCLCPP_INFO(node->get_logger(), "I heard: %s", msg.data.c_str());
113111
} else {
114112
RCLCPP_ERROR(node->get_logger(), "An invalid message from topic C was received.");
115113
}
116114
}
117115
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
118-
RCLCPP_ERROR(node->get_logger(), "No message received after 5s.");
119-
} else {
120-
RCLCPP_ERROR(node->get_logger(), "Wait-set failed.");
116+
if (rclcpp::ok()) {
117+
RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout");
118+
}
121119
}
122120
}
123121

rclcpp/wait_set/wait_set_example.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ int32_t main(const int32_t argc, char ** const argv)
2525
{
2626
rclcpp::init(argc, argv);
2727

28-
auto node = std::make_shared<rclcpp::Node>("wait_set_example_node");
28+
auto node = std::make_shared<rclcpp::Node>("wait_set_listener");
2929
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};
3030

3131
auto sub1 = node->create_subscription<std_msgs::msg::String>("topicA", 1, do_nothing);
@@ -74,9 +74,7 @@ int32_t main(const int32_t argc, char ** const argv)
7474

7575
auto num_recv = std::size_t();
7676
while (num_recv < 3U) {
77-
// Waiting up to 5s for a sample to arrive.
78-
const auto wait_result = wait_set.wait(std::chrono::seconds(5));
79-
77+
const auto wait_result = wait_set.wait(2s);
8078
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
8179
if (wait_result.get_wait_set().get_rcl_wait_set().timers[0U]) {
8280
// we execute manually the timer callback
@@ -111,9 +109,9 @@ int32_t main(const int32_t argc, char ** const argv)
111109
RCLCPP_INFO(node->get_logger(), "Number of messages already got: %zu of 3", num_recv);
112110
}
113111
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
114-
RCLCPP_ERROR(node->get_logger(), "No message received after 5s.");
115-
} else {
116-
RCLCPP_ERROR(node->get_logger(), "Wait-set failed.");
112+
if (rclcpp::ok()) {
113+
RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout");
114+
}
117115
}
118116
}
119117
RCLCPP_INFO(node->get_logger(), "Got all messages!");

0 commit comments

Comments
 (0)