From 9ad34abadccf51ab233a0ae427added69a6da8dd Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 10 Mar 2026 13:34:33 +0800 Subject: [PATCH] Add support check for content filter feature in subscription Signed-off-by: Barry Xu --- rclcpp/include/rclcpp/subscription_base.hpp | 9 ++ rclcpp/src/rclcpp/parameter_event_handler.cpp | 4 + rclcpp/src/rclcpp/subscription_base.cpp | 6 + .../test_subscription_content_filter.cpp | 116 +++++++++--------- 4 files changed, 78 insertions(+), 57 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index ace5dfdc44..178d677eb1 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -551,6 +551,15 @@ class SubscriptionBase : public std::enable_shared_from_this event_handlers_[event_type]->clear_on_ready_callback(); } + /// Check if content filtered topic feature of the subscription instance is supported. + /** + * \return boolean flag indicating if the content filtered topic of this subscription is + * supported. + */ + RCLCPP_PUBLIC + bool + is_cft_supported() const; + /// Check if content filtered topic feature of the subscription instance is enabled. /** * \return boolean flag indicating if the content filtered topic of this subscription is enabled. diff --git a/rclcpp/src/rclcpp/parameter_event_handler.cpp b/rclcpp/src/rclcpp/parameter_event_handler.cpp index 314ae5593d..edb6a7bf04 100644 --- a/rclcpp/src/rclcpp/parameter_event_handler.cpp +++ b/rclcpp/src/rclcpp/parameter_event_handler.cpp @@ -77,6 +77,10 @@ ParameterEventHandler::add_parameter_callback( bool ParameterEventHandler::configure_nodes_filter(const std::vector & node_names) { + if (!event_subscription_->is_cft_supported()) { + return false; + } + if (node_names.empty()) { // Clear content filter event_subscription_->set_content_filter(""); diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index a3086efebe..35188477d2 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -480,6 +480,12 @@ SubscriptionBase::set_on_new_message_callback( } } +bool +SubscriptionBase::is_cft_supported() const +{ + return rcl_subscription_is_cft_supported(subscription_handle_.get()); +} + bool SubscriptionBase::is_cft_enabled() const { diff --git a/rclcpp/test/rclcpp/test_subscription_content_filter.cpp b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp index 7fa3507ff7..1c10995822 100644 --- a/rclcpp/test/rclcpp/test_subscription_content_filter.cpp +++ b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp @@ -108,6 +108,21 @@ bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::Bas m1.uint64_value == m2.uint64_value; } +TEST_F(TestContentFilterSubscription, is_cft_supported) +{ + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_supported, false); + EXPECT_FALSE(sub->is_cft_supported()); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_supported, true); + EXPECT_TRUE(sub->is_cft_supported()); + } +} + TEST_F(TestContentFilterSubscription, is_cft_enabled) { { @@ -165,7 +180,7 @@ TEST_F(TestContentFilterSubscription, get_content_filter) TEST_F(TestContentFilterSubscription, set_content_filter) { - if (sub->is_cft_enabled()) { + if (sub->is_cft_supported()) { EXPECT_NO_THROW( sub->set_content_filter(filter_expression_init, expression_parameters_2)); } else { @@ -178,8 +193,10 @@ TEST_F(TestContentFilterSubscription, set_content_filter) TEST_F(TestContentFilterSubscription, content_filter_get_begin) { std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier()); - if (rmw_implementation_str == "rmw_zenoh_cpp") { - GTEST_SKIP(); + + // Run test only if rmw implementation supports content filter, otherwise skip it. + if (!sub->is_cft_supported()) { + GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter"; } using namespace std::chrono_literals; @@ -206,28 +223,27 @@ TEST_F(TestContentFilterSubscription, content_filter_get_begin) EXPECT_TRUE(receive); EXPECT_EQ(original_message, output_message); - if (sub->is_cft_enabled()) { - EXPECT_NO_THROW( - sub->set_content_filter(filter_expression_init, expression_parameters_2)); - // waiting to allow for filter propagation - std::this_thread::sleep_for(std::chrono::seconds(10)); + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); - test_msgs::msg::BasicTypes original_message; - original_message.int32_value = 3; - pub->publish(original_message); + test_msgs::msg::BasicTypes message_to_be_filtered; + message_to_be_filtered.int32_value = original_message.int32_value; + pub->publish(message_to_be_filtered); - test_msgs::msg::BasicTypes output_message; - bool receive = wait_for_message(output_message, sub, context, 10s); - EXPECT_FALSE(receive); - } + test_msgs::msg::BasicTypes output_message2; + receive = wait_for_message(output_message2, sub, context, 10s); + EXPECT_FALSE(receive); } } TEST_F(TestContentFilterSubscription, content_filter_get_later) { std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier()); - if (rmw_implementation_str == "rmw_zenoh_cpp") { - GTEST_SKIP(); + // Run test only if rmw implementation supports content filter, otherwise skip it. + if (!sub->is_cft_supported()) { + GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter"; } using namespace std::chrono_literals; @@ -251,36 +267,30 @@ TEST_F(TestContentFilterSubscription, content_filter_get_later) test_msgs::msg::BasicTypes output_message; bool receive = wait_for_message(output_message, sub, context, 10s); - if (sub->is_cft_enabled()) { - EXPECT_FALSE(receive); - } else { - EXPECT_TRUE(receive); - EXPECT_EQ(original_message, output_message); - } + EXPECT_FALSE(receive); - if (sub->is_cft_enabled()) { - EXPECT_NO_THROW( - sub->set_content_filter(filter_expression_init, expression_parameters_2)); - // waiting to allow for filter propagation - std::this_thread::sleep_for(std::chrono::seconds(10)); + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); - test_msgs::msg::BasicTypes original_message; - original_message.int32_value = 4; - pub->publish(original_message); + test_msgs::msg::BasicTypes matching_message; + matching_message.int32_value = original_message.int32_value; + pub->publish(matching_message); - test_msgs::msg::BasicTypes output_message; - bool receive = wait_for_message(output_message, sub, context, 10s); - EXPECT_TRUE(receive); - EXPECT_EQ(original_message, output_message); - } + test_msgs::msg::BasicTypes output_message2; + receive = wait_for_message(output_message2, sub, context, 10s); + EXPECT_TRUE(receive); + EXPECT_EQ(matching_message, output_message2); } } TEST_F(TestContentFilterSubscription, content_filter_reset) { std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier()); - if (rmw_implementation_str == "rmw_zenoh_cpp") { - GTEST_SKIP(); + // Run test only if rmw implementation supports content filter, otherwise skip it. + if (!sub->is_cft_supported()) { + GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter"; } using namespace std::chrono_literals; @@ -304,28 +314,20 @@ TEST_F(TestContentFilterSubscription, content_filter_reset) test_msgs::msg::BasicTypes output_message; bool receive = wait_for_message(output_message, sub, context, 10s); - if (sub->is_cft_enabled()) { - EXPECT_FALSE(receive); - } else { - EXPECT_TRUE(receive); - EXPECT_EQ(original_message, output_message); - } + EXPECT_FALSE(receive); - if (sub->is_cft_enabled()) { - EXPECT_NO_THROW( - sub->set_content_filter("")); - // waiting to allow for filter propagation - std::this_thread::sleep_for(std::chrono::seconds(10)); + EXPECT_NO_THROW(sub->set_content_filter("")); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); - test_msgs::msg::BasicTypes original_message; - original_message.int32_value = 4; - pub->publish(original_message); + test_msgs::msg::BasicTypes unfiltered_message; + unfiltered_message.int32_value = original_message.int32_value; + pub->publish(unfiltered_message); - test_msgs::msg::BasicTypes output_message; - bool receive = wait_for_message(output_message, sub, context, 10s); - EXPECT_TRUE(receive); - EXPECT_EQ(original_message, output_message); - } + test_msgs::msg::BasicTypes output_message2; + receive = wait_for_message(output_message2, sub, context, 10s); + EXPECT_TRUE(receive); + EXPECT_EQ(unfiltered_message, output_message2); } }