Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,15 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
event_handlers_[event_type]->clear_on_ready_callback();
}

/// Check if content filtered topic feature of the subscription instance is supported.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is
* supported.
*/
RCLCPP_PUBLIC
bool
is_cft_supported() const;

/// Check if content filtered topic feature of the subscription instance is enabled.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is enabled.
Expand Down
4 changes: 4 additions & 0 deletions rclcpp/src/rclcpp/parameter_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ ParameterEventHandler::add_parameter_callback(
bool
ParameterEventHandler::configure_nodes_filter(const std::vector<std::string> & node_names)
{
if (!event_subscription_->is_cft_supported()) {
return false;
}

if (node_names.empty()) {
// Clear content filter
event_subscription_->set_content_filter("");
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,12 @@ SubscriptionBase::set_on_new_message_callback(
}
}

bool
SubscriptionBase::is_cft_supported() const
{
return rcl_subscription_is_cft_supported(subscription_handle_.get());
}

bool
SubscriptionBase::is_cft_enabled() const
{
Expand Down
116 changes: 59 additions & 57 deletions rclcpp/test/rclcpp/test_subscription_content_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::Bas
m1.uint64_value == m2.uint64_value;
}

TEST_F(TestContentFilterSubscription, is_cft_supported)
{
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, false);
EXPECT_FALSE(sub->is_cft_supported());
}

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, true);
EXPECT_TRUE(sub->is_cft_supported());
}
}

TEST_F(TestContentFilterSubscription, is_cft_enabled)
{
{
Expand Down Expand Up @@ -165,7 +180,7 @@ TEST_F(TestContentFilterSubscription, get_content_filter)

TEST_F(TestContentFilterSubscription, set_content_filter)
{
if (sub->is_cft_enabled()) {
if (sub->is_cft_supported()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
} else {
Expand All @@ -178,8 +193,10 @@ TEST_F(TestContentFilterSubscription, set_content_filter)
TEST_F(TestContentFilterSubscription, content_filter_get_begin)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();

// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}

using namespace std::chrono_literals;
Expand All @@ -206,28 +223,27 @@ TEST_F(TestContentFilterSubscription, content_filter_get_begin)
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);

if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));

test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 3;
pub->publish(original_message);
test_msgs::msg::BasicTypes message_to_be_filtered;
message_to_be_filtered.int32_value = original_message.int32_value;
pub->publish(message_to_be_filtered);

test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_FALSE(receive);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_FALSE(receive);
}
}

TEST_F(TestContentFilterSubscription, content_filter_get_later)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();
// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}

using namespace std::chrono_literals;
Expand All @@ -251,36 +267,30 @@ TEST_F(TestContentFilterSubscription, content_filter_get_later)

test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
if (sub->is_cft_enabled()) {
EXPECT_FALSE(receive);
} else {
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
EXPECT_FALSE(receive);

if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));

test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 4;
pub->publish(original_message);
test_msgs::msg::BasicTypes matching_message;
matching_message.int32_value = original_message.int32_value;
pub->publish(matching_message);

test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(matching_message, output_message2);
}
}

TEST_F(TestContentFilterSubscription, content_filter_reset)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();
// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}

using namespace std::chrono_literals;
Expand All @@ -304,28 +314,20 @@ TEST_F(TestContentFilterSubscription, content_filter_reset)

test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
if (sub->is_cft_enabled()) {
EXPECT_FALSE(receive);
} else {
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
EXPECT_FALSE(receive);

if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(""));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(sub->set_content_filter(""));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));

test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 4;
pub->publish(original_message);
test_msgs::msg::BasicTypes unfiltered_message;
unfiltered_message.int32_value = original_message.int32_value;
pub->publish(unfiltered_message);

test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(unfiltered_message, output_message2);
}
}

Expand Down