Skip to content

Commit c6d04f7

Browse files
skyegalaxyMauro Passerinomauropassealsorajefferyyjhsu
authored
Cherry-picks and changes from "irobot/iron" for Jazzy (#158)
* Add logs on failed take response/request (#107) * Ignore local endpoints (#131) * Refs #18846: PoC ignore local endpoints: extend RCLCPP API Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com> * Refs #18846: PoC ignore local endpoints: modify RCLCPP publish logic Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com> --------- Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com> Co-authored-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com> (cherry picked from commit 106c03a) style * Support intra-process communication: Clients & Services (ros2#1847) Signed-off-by: Mauro Passerino <mpasserino@irobot.com> (cherry picked from commit 58d2a04) Remove redundant `add_subscription` cherry pick artifact use const ref instead of ptr, add missing capacity fn more style * Jazzy recreation of "Add action client/server IPC support" (aeacde9) * add override attribute to some ipc methods Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * clear intra-process manager on client/server destructors (#94) (cherry picked from commit 378223d) * move ipc lock to appropriate position in client.hpp (cherry picked from commit 9de603e) * Actions: Use ipc_setting = rclcpp::IntraProcessSetting::NodeDefault (#133) Co-authored-by: Mauro Passerino <mpasserino@irobot.com> * Allow for deferred responses with ipc (#135) * allow for deferred responses with ipc * fix send response * fix use member service ipc process * add map to store CallbackInfoVariant * add send_response to base class, set function ptr to get handle * move service intra process outside base * copy response into shared pointer for ipc * add typename for service template * remove ref signature * try without std::ref * try without ref wrapper * try emplace * make pair with variant * some cleanup * erase callback info if client invalid * add post_init_setup for services * fix extra comma * add post init setup after lifecycle node services * add documentation for ServiceIntraProcess template change * use weak ptr to service in ServiceIntraProcess * move check for valid service handle to beginning of function * add comment for callback info map (cherry picked from commit aa95a48) * check request header for intraprocess (#139) * check request header for intraprocess * set request header intraprocess to false in execute_service (cherry picked from commit a617f93) * Include namespaces in service names (#140) * Include node namespace in IPC Action service name * Include node namespace in IPC Client/Service service name --------- Co-authored-by: Mauro Passerino <mpasserino@irobot.com> (cherry picked from commit f5b2001) remove whitespace in service.hpp * Fix mutltiple client requests (#142) * store map of unique request id to client id and callback info pair * fix map end check * fix undefined reference * remove unnecessary request id erase, remove/fix unique id comment * improve unique id comment (cherry picked from commit b865383) * RECREATION OF Fixes for intra-process actions (#144) action client / server ipc decrustification whitespace / line length / uncrustify - service_intra_process const correctness * add logs and minor fixes (#146) * add logs and minor fixes Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * use >0 rather than ==1 in comparison Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * correct template syntax Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> (cherry picked from commit d4dd4e4) * avoid adding notify waitable twice to events-executor collection (ros2#2564) * avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * remove redundant mutex lock Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> (cherry picked from commit f27bdbf) * Fix bug in timers lifecycle for events executor (ros2#2586) * Remove expired timers before updating the collection Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Add regression test for reinitialized timers bug Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Add missing includes Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Relocate test under the executors directory Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Extend test to run with all supported executors Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Adjust comment in fix to make it more generic Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Apply ament clang format to test Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> * Fix uncrustify findings Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> --------- Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> Co-authored-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> (cherry picked from commit 9ef9646) * Bring lock_free_events_queue to rclcpp from the events_executor repo (#149) Co-authored-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> (cherry picked from commit 30050c1) exclude lock free queue from linting actually lint lock_free_events_queue lock_free_events_queue copyright whitespace / line length / uncrustify - lock_free_events_queue * Add test_actions (#150) Co-authored-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> (cherry picked from commit aef928d) * Jazzy - Action IPC Fixes (See 7a51f00) whitespace / line length / uncrustify - rclcpp_action client whitespace / line length / uncrustify - rclcpp_action server fix duplicate member definitions in rclcpp_action server.hpp fixes for IPC action server delete log whitespace / line length / uncrustify - rclcpp action server again minor action client fixes whitespace / line length / uncrustify - rclcpp action client again * Always publish inter-process on TRANSIENT_LOCAL pubs (#152) * Mauro/irobot iron fixes (#155) whitespace / line length / uncrustify - publisher proper inter_process_publish_needed whitespace / line length / uncrustify - rclcpp publisher * Add test_actions & test_transient_local (#157) Co-authored-by: Mauro Passerino <mpasserino@irobot.com> (cherry picked from commit cf182e0) whitespace / line length / uncrustify - test_actions and test_transient_local whitespace / line length / uncrustify - test actions and transient_local again remove redundant test_actions * Call service post_init_setup in test_service.cpp --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> Signed-off-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com> Co-authored-by: Mauro Passerino <mpasserino@irobot.com> Co-authored-by: mauropasse <mauropasse@hotmail.com> Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com> Co-authored-by: Jeffery Hsu <jefferyyjhsu@gmail.com> Co-authored-by: bpwilcox <bpwonline28@gmail.com> Co-authored-by: Alexis Pojomovsky <apojomovsky@gmail.com> Co-authored-by: Alexis Pojomovsky <apojomovsky@ekumenlabs.com>
1 parent 8266f85 commit c6d04f7

File tree

62 files changed

+10433
-221
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+10433
-221
lines changed

rclcpp/CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,12 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
3939
endif()
4040

4141
set(${PROJECT_NAME}_SRCS
42+
src/rclcpp/action_client_intra_process_base.cpp
43+
src/rclcpp/action_server_intra_process_base.cpp
4244
src/rclcpp/any_executable.cpp
4345
src/rclcpp/callback_group.cpp
4446
src/rclcpp/client.cpp
47+
src/rclcpp/client_intra_process_base.cpp
4548
src/rclcpp/clock.cpp
4649
src/rclcpp/context.cpp
4750
src/rclcpp/contexts/default_context.cpp
@@ -116,6 +119,7 @@ set(${PROJECT_NAME}_SRCS
116119
src/rclcpp/serialization.cpp
117120
src/rclcpp/serialized_message.cpp
118121
src/rclcpp/service.cpp
122+
src/rclcpp/service_intra_process_base.cpp
119123
src/rclcpp/signal_handler.cpp
120124
src/rclcpp/subscription_base.cpp
121125
src/rclcpp/subscription_intra_process_base.cpp
@@ -285,8 +289,14 @@ ament_export_dependencies(
285289

286290
if(BUILD_TESTING)
287291
find_package(ament_lint_auto REQUIRED)
292+
file(GLOB_RECURSE AMENT_LINT_AUTO_FILE_EXCLUDE
293+
include/rclcpp/experimental/executors/events_executor/concurrent_queue/*.h)
288294
ament_lint_auto_find_test_dependencies()
289295

296+
message(AUTHOR_WARNING
297+
"Ament lint auto tests are disabled on the following: "
298+
${AMENT_LINT_AUTO_FILE_EXCLUDE}
299+
)
290300
add_subdirectory(test)
291301
endif()
292302

rclcpp/include/rclcpp/client.hpp

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@
3737

3838
#include "rclcpp/clock.hpp"
3939
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
40+
#include "rclcpp/detail/resolve_use_intra_process.hpp"
4041
#include "rclcpp/exceptions.hpp"
4142
#include "rclcpp/expand_topic_or_service_name.hpp"
43+
#include "rclcpp/experimental/client_intra_process.hpp"
44+
#include "rclcpp/experimental/intra_process_manager.hpp"
4245
#include "rclcpp/function_traits.hpp"
46+
#include "rclcpp/intra_process_setting.hpp"
4347
#include "rclcpp/logging.hpp"
4448
#include "rclcpp/macros.hpp"
4549
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
@@ -48,6 +52,8 @@
4852
#include "rclcpp/utilities.hpp"
4953
#include "rclcpp/visibility_control.hpp"
5054

55+
#include "rcutils/logging_macros.h"
56+
5157
#include "rmw/error_handling.h"
5258
#include "rmw/impl/cpp/demangle.hpp"
5359
#include "rmw/rmw.h"
@@ -277,6 +283,15 @@ class ClientBase
277283
rclcpp::QoS
278284
get_response_subscription_actual_qos() const;
279285

286+
/// Return the waitable for intra-process
287+
/**
288+
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
289+
* \throws std::runtime_error if the intra process manager is destroyed
290+
*/
291+
RCLCPP_PUBLIC
292+
rclcpp::Waitable::SharedPtr
293+
get_intra_process_waitable();
294+
280295
/// Set a callback to be called when each new response is received.
281296
/**
282297
* The callback receives a size_t which is the number of responses received
@@ -381,6 +396,19 @@ class ClientBase
381396
void
382397
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);
383398

399+
using IntraProcessManagerWeakPtr =
400+
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
401+
402+
/// Implementation detail.
403+
RCLCPP_PUBLIC
404+
void
405+
setup_intra_process(
406+
uint64_t intra_process_client_id,
407+
IntraProcessManagerWeakPtr weak_ipm);
408+
409+
std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
410+
std::atomic_uint ipc_sequence_number_{1};
411+
384412
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
385413
std::shared_ptr<rcl_node_t> node_handle_;
386414
std::shared_ptr<rclcpp::Context> context_;
@@ -396,6 +424,11 @@ class ClientBase
396424
std::shared_ptr<rcl_client_t> client_handle_;
397425

398426
std::atomic<bool> in_use_by_wait_set_{false};
427+
428+
std::recursive_mutex ipc_mutex_;
429+
bool use_intra_process_{false};
430+
IntraProcessManagerWeakPtr weak_ipm_;
431+
uint64_t intra_process_client_id_;
399432
};
400433

401434
template<typename ServiceT>
@@ -491,12 +524,14 @@ class Client : public ClientBase
491524
* \param[in] node_graph The node graph interface of the corresponding node.
492525
* \param[in] service_name Name of the topic to publish to.
493526
* \param[in] client_options options for the subscription.
527+
* \param[in] ipc_setting Intra-process communication setting for the client.
494528
*/
495529
Client(
496530
rclcpp::node_interfaces::NodeBaseInterface * node_base,
497531
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
498532
const std::string & service_name,
499-
rcl_client_options_t & client_options)
533+
rcl_client_options_t & client_options,
534+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
500535
: ClientBase(node_base, node_graph),
501536
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
502537
{
@@ -519,10 +554,27 @@ class Client : public ClientBase
519554
}
520555
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
521556
}
557+
558+
// Setup intra process if requested.
559+
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
560+
create_intra_process_client();
561+
}
522562
}
523563

524564
virtual ~Client()
525565
{
566+
if (!use_intra_process_) {
567+
return;
568+
}
569+
auto ipm = weak_ipm_.lock();
570+
if (!ipm) {
571+
// TODO(ivanpauno): should this raise an error?
572+
RCLCPP_WARN(
573+
rclcpp::get_logger("rclcpp"),
574+
"Intra process manager died before than a client.");
575+
return;
576+
}
577+
ipm->remove_client(intra_process_client_id_);
526578
}
527579

528580
/// Take the next response for this client.
@@ -639,7 +691,7 @@ class Client : public ClientBase
639691
Promise promise;
640692
auto future = promise.get_future();
641693
auto req_id = async_send_request_impl(
642-
*request,
694+
std::move(request),
643695
std::move(promise));
644696
return FutureAndRequestId(std::move(future), req_id);
645697
}
@@ -674,7 +726,7 @@ class Client : public ClientBase
674726
Promise promise;
675727
auto shared_future = promise.get_future().share();
676728
auto req_id = async_send_request_impl(
677-
*request,
729+
std::move(request),
678730
std::make_tuple(
679731
CallbackType{std::forward<CallbackT>(cb)},
680732
shared_future,
@@ -705,7 +757,7 @@ class Client : public ClientBase
705757
PromiseWithRequest promise;
706758
auto shared_future = promise.get_future().share();
707759
auto req_id = async_send_request_impl(
708-
*request,
760+
request,
709761
std::make_tuple(
710762
CallbackWithRequestType{std::forward<CallbackT>(cb)},
711763
request,
@@ -839,11 +891,35 @@ class Client : public ClientBase
839891
CallbackWithRequestTypeValueVariant>;
840892

841893
int64_t
842-
async_send_request_impl(const Request & request, CallbackInfoVariant value)
894+
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
843895
{
896+
{
897+
std::lock_guard<std::recursive_mutex> lock(ipc_mutex_);
898+
if (use_intra_process_) {
899+
auto ipm = weak_ipm_.lock();
900+
if (!ipm) {
901+
throw std::runtime_error(
902+
"intra process send called after destruction of intra process manager");
903+
}
904+
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);
905+
906+
// Check if there's an intra-process server available matching this client.
907+
// If there's not, we fall back into inter-process communication, since
908+
// the server might be available in another process or was configured to not use IPC.
909+
if (intra_process_server_available) {
910+
// Send intra-process request
911+
ipm->template send_intra_process_client_request<ServiceT>(
912+
intra_process_client_id_,
913+
std::make_pair(std::move(request), std::move(value)));
914+
return ipc_sequence_number_++;
915+
}
916+
}
917+
}
918+
919+
// Send inter-process request
844920
int64_t sequence_number;
845921
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
846-
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
922+
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
847923
if (RCL_RET_OK != ret) {
848924
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
849925
}
@@ -869,6 +945,59 @@ class Client : public ClientBase
869945
return value;
870946
}
871947

948+
void
949+
create_intra_process_client()
950+
{
951+
// Check if the QoS is compatible with intra-process.
952+
auto qos_profile = get_response_subscription_actual_qos();
953+
954+
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
955+
throw std::invalid_argument(
956+
"intraprocess communication allowed only with keep last history qos policy");
957+
}
958+
if (qos_profile.depth() == 0) {
959+
throw std::invalid_argument(
960+
"intraprocess communication is not allowed with 0 depth qos policy");
961+
}
962+
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
963+
throw std::invalid_argument(
964+
"intraprocess communication allowed only with volatile durability");
965+
}
966+
967+
// Create a ClientIntraProcess which will be given to the intra-process manager.
968+
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;
969+
970+
// Expand the given service name.
971+
char * remapped_service_name = NULL;
972+
rcl_allocator_t allocator = rcl_get_default_allocator();
973+
974+
rcl_ret_t ret = rcl_node_resolve_name(
975+
this->get_rcl_node_handle(),
976+
this->get_service_name(),
977+
allocator,
978+
true,
979+
false,
980+
&remapped_service_name);
981+
982+
if (RCL_RET_OK != ret) {
983+
allocator.deallocate(remapped_service_name, allocator.state);
984+
rclcpp::exceptions::throw_from_rcl_error(ret, "client failed to resolve service name");
985+
}
986+
987+
client_intra_process_ = std::make_shared<ClientIntraProcessT>(
988+
context_,
989+
remapped_service_name,
990+
qos_profile);
991+
992+
allocator.deallocate(remapped_service_name, allocator.state);
993+
994+
// Add it to the intra process manager.
995+
using rclcpp::experimental::IntraProcessManager;
996+
auto ipm = context_->get_sub_context<IntraProcessManager>();
997+
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
998+
this->setup_intra_process(intra_process_client_id, ipm);
999+
}
1000+
8721001
RCLCPP_DISABLE_COPY(Client)
8731002

8741003
std::unordered_map<

rclcpp/include/rclcpp/create_client.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ create_client(
4646
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
4747
const std::string & service_name,
4848
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
49-
rclcpp::CallbackGroup::SharedPtr group = nullptr)
49+
rclcpp::CallbackGroup::SharedPtr group = nullptr,
50+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
5051
{
5152
return create_client<ServiceT>(
5253
node_base, node_graph, node_services,
5354
service_name,
5455
qos.get_rmw_qos_profile(),
55-
group);
56+
group,
57+
ipc_setting);
5658
}
5759

5860
/// Create a service client with a given type.
@@ -65,7 +67,8 @@ create_client(
6567
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
6668
const std::string & service_name,
6769
const rmw_qos_profile_t & qos_profile,
68-
rclcpp::CallbackGroup::SharedPtr group)
70+
rclcpp::CallbackGroup::SharedPtr group,
71+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
6972
{
7073
rcl_client_options_t options = rcl_client_get_default_options();
7174
options.qos = qos_profile;
@@ -74,7 +77,8 @@ create_client(
7477
node_base.get(),
7578
node_graph,
7679
service_name,
77-
options);
80+
options,
81+
ipc_setting);
7882

7983
auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
8084
node_services->add_client(cli_base_ptr, group);

rclcpp/include/rclcpp/create_service.hpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ create_service(
4646
const std::string & service_name,
4747
CallbackT && callback,
4848
const rclcpp::QoS & qos,
49-
rclcpp::CallbackGroup::SharedPtr group)
49+
rclcpp::CallbackGroup::SharedPtr group,
50+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
5051
{
5152
return create_service<ServiceT, CallbackT>(
5253
node_base, node_services, service_name,
53-
std::forward<CallbackT>(callback), qos.get_rmw_qos_profile(), group);
54+
std::forward<CallbackT>(callback), qos.get_rmw_qos_profile(), group, ipc_setting);
5455
}
5556

5657
/// Create a service with a given type.
@@ -63,7 +64,8 @@ create_service(
6364
const std::string & service_name,
6465
CallbackT && callback,
6566
const rmw_qos_profile_t & qos_profile,
66-
rclcpp::CallbackGroup::SharedPtr group)
67+
rclcpp::CallbackGroup::SharedPtr group,
68+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
6769
{
6870
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
6971
any_service_callback.set(std::forward<CallbackT>(callback));
@@ -72,8 +74,9 @@ create_service(
7274
service_options.qos = qos_profile;
7375

7476
auto serv = Service<ServiceT>::make_shared(
75-
node_base->get_shared_rcl_node_handle(),
77+
node_base,
7678
service_name, any_service_callback, service_options);
79+
serv->post_init_setup(node_base, ipc_setting);
7780
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
7881
node_services->add_service(serv_base_ptr, group);
7982
return serv;

rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ namespace detail
2626
{
2727

2828
/// Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
29-
template<typename OptionsT, typename NodeBaseT>
29+
template<typename NodeBaseT>
3030
bool
31-
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
31+
resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base)
3232
{
3333
bool use_intra_process;
34-
switch (options.use_intra_process_comm) {
34+
switch (ipc_setting) {
3535
case IntraProcessSetting::Enable:
3636
use_intra_process = true;
3737
break;

0 commit comments

Comments
 (0)