Skip to content

Commit bab1ea2

Browse files
alsoraskyegalaxy
authored andcommitted
avoid adding notify waitable twice to events-executor collection (ros2#2564)
* avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * remove redundant mutex lock Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> (cherry picked from commit f27bdbf)
1 parent 7c1b2f8 commit bab1ea2

File tree

2 files changed

+118
-6
lines changed

2 files changed

+118
-6
lines changed

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
432432
// We could explicitly check for the notify waitable ID when we receive a waitable event
433433
// but I think that it's better if the waitable was in the collection and it could be
434434
// retrieved in the "standard" way.
435-
// To do it, we need to add the notify waitable as an entry in both the new and
436-
// current collections such that it's neither added or removed.
435+
// To do it, we need to add the notify waitable as an entry in the new collection
436+
// such that it's neither added or removed (it should have already been added
437+
// to the current collection in the constructor)
437438
this->add_notify_waitable_to_collection(new_collection.waitables);
438439

439-
// Acquire lock before modifying the current collection
440-
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
441-
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
442-
443440
this->refresh_current_collection(new_collection);
444441
}
445442

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,121 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
694694
}
695695
}
696696

697+
// This test verifies the thread-safety of adding and removing a node
698+
// while the executor is spinning and events are ready.
699+
// This test does not contain expectations, but rather it verifies that
700+
// we can run a "stressful routine" without crashing.
701+
TYPED_TEST(TestExecutors, stressAddRemoveNode)
702+
{
703+
using ExecutorType = TypeParam;
704+
// rmw_connextdds doesn't support events-executor
705+
if (
706+
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
707+
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
708+
{
709+
GTEST_SKIP();
710+
}
711+
712+
ExecutorType executor;
713+
714+
// A timer that is "always" ready (the timer callback doesn't do anything)
715+
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});
716+
717+
// This thread spins the executor until it's cancelled
718+
std::thread spinner_thread([&]() {
719+
executor.spin();
720+
});
721+
722+
// This thread publishes data in a busy loop (the node has a subscription)
723+
std::thread publisher_thread1([&]() {
724+
for (size_t i = 0; i < 100000; i++) {
725+
this->publisher->publish(test_msgs::msg::Empty());
726+
}
727+
});
728+
std::thread publisher_thread2([&]() {
729+
for (size_t i = 0; i < 100000; i++) {
730+
this->publisher->publish(test_msgs::msg::Empty());
731+
}
732+
});
733+
734+
// This thread adds/remove the node that contains the entities in a busy loop
735+
std::thread add_remove_thread([&]() {
736+
for (size_t i = 0; i < 100000; i++) {
737+
executor.add_node(this->node);
738+
executor.remove_node(this->node);
739+
}
740+
});
741+
742+
// Wait for the threads that do real work to finish
743+
publisher_thread1.join();
744+
publisher_thread2.join();
745+
add_remove_thread.join();
746+
747+
executor.cancel();
748+
spinner_thread.join();
749+
}
750+
751+
// Check that executors are correctly notified while they are spinning
752+
// we notify twice to ensure that the notify waitable is still working
753+
// after the first notification
754+
TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning)
755+
{
756+
using ExecutorType = TypeParam;
757+
758+
// Create executor, add the node and start spinning
759+
ExecutorType executor;
760+
executor.add_node(this->node);
761+
std::thread spinner([&]() {executor.spin();});
762+
763+
// Wait for executor to be spinning
764+
while (!executor.is_spinning()) {
765+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
766+
}
767+
768+
// Create the first subscription while the executor is already spinning
769+
std::atomic<size_t> sub1_msg_count {0};
770+
auto sub1 = this->node->template create_subscription<test_msgs::msg::Empty>(
771+
this->publisher->get_topic_name(),
772+
rclcpp::QoS(10),
773+
[&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
774+
sub1_msg_count++;
775+
});
776+
777+
// Publish a message and verify it's received
778+
this->publisher->publish(test_msgs::msg::Empty());
779+
auto start = std::chrono::steady_clock::now();
780+
while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) {
781+
std::this_thread::sleep_for(1ms);
782+
}
783+
EXPECT_EQ(sub1_msg_count, 1u);
784+
785+
// Create a second subscription while the executor is already spinning
786+
std::atomic<size_t> sub2_msg_count {0};
787+
auto sub2 = this->node->template create_subscription<test_msgs::msg::Empty>(
788+
this->publisher->get_topic_name(),
789+
rclcpp::QoS(10),
790+
[&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
791+
sub2_msg_count++;
792+
});
793+
794+
// Publish a message and verify it's received by both subscriptions
795+
this->publisher->publish(test_msgs::msg::Empty());
796+
start = std::chrono::steady_clock::now();
797+
while (
798+
sub1_msg_count == 1 &&
799+
sub2_msg_count == 0 &&
800+
(std::chrono::steady_clock::now() - start) < 10s)
801+
{
802+
std::this_thread::sleep_for(1ms);
803+
}
804+
EXPECT_EQ(sub1_msg_count, 2u);
805+
EXPECT_EQ(sub2_msg_count, 1u);
806+
807+
// Cancel needs to be called before join, so that executor.spin() returns.
808+
executor.cancel();
809+
spinner.join();
810+
}
811+
697812
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
698813
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
699814
{

0 commit comments

Comments
 (0)