From b8ce3c61ff5cdcfb9d5476cd7d4d541839427320 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 31 Dec 2024 19:10:36 +0800 Subject: [PATCH 1/8] feat: introduce the advanced publisher/subscriber --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 34 --- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 9 - .../src/detail/rmw_publisher_data.cpp | 64 +++--- .../src/detail/rmw_publisher_data.hpp | 9 +- .../src/detail/rmw_subscription_data.cpp | 214 +++++------------- .../src/detail/rmw_subscription_data.hpp | 4 +- 6 files changed, 84 insertions(+), 250 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index abadd967..f2f43034 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -1253,38 +1253,4 @@ void GraphCache::update_event_counters( } } } - -///============================================================================= -void GraphCache::set_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyxpr_hash, - QueryingSubscriberCallback cb) -{ - std::unordered_map< - std::string, - std::unordered_map - >::iterator cb_it = querying_subs_cbs_.find(sub_keyexpr); - if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[sub_keyexpr] = - std::unordered_map{}; - cb_it = querying_subs_cbs_.find(sub_keyexpr); - } - cb_it->second.insert(std::make_pair(sub_keyxpr_hash, std::move(cb))); -} - -///============================================================================= -void GraphCache::remove_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash) -{ - std::unordered_map< - std::string, - std::unordered_map - >::iterator cb_map_it = querying_subs_cbs_.find(sub_keyexpr); - if (cb_map_it == querying_subs_cbs_.end()) { - return; - } - cb_map_it->second.erase(sub_keyexpr_hash); -} - } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 22fffe20..b6bfd059 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -189,15 +189,6 @@ class GraphCache final /// Returns true if the entity is a publisher or client. False otherwise. static bool is_entity_pub(const liveliness::Entity & entity); - void set_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash, - QueryingSubscriberCallback cb); - - void remove_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash); - private: // Helper function to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f960b879..a17bc07d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -107,40 +107,35 @@ std::shared_ptr PublisherData::make( return nullptr; } - zenoh::ZResult result; - std::optional pub_cache; - zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); + auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default(); + adv_pub_opts.publisher_detection = true; + adv_pub_opts.sample_miss_detection = true; + // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - zenoh::ext::SessionExt::PublicationCacheOptions pub_cache_opts = - zenoh::ext::SessionExt::PublicationCacheOptions::create_default(); - - pub_cache_opts.history = adapted_qos_profile.depth; - pub_cache_opts.queryable_complete = true; - - std::string queryable_prefix = entity->zid(); - pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); - - pub_cache = session->ext().declare_publication_cache( - pub_ke, std::move(pub_cache_opts), &result); - - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); - return nullptr; - } + adv_pub_opts.cache = zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); + adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; } + zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Set congestion_control to BLOCK if appropriate. - zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + auto pub_opts = zenoh::Session::PublisherOptions::create_default(); + pub_opts.congestion_control = Z_CONGESTION_CONTROL_DROP; if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - opts.reliability = Z_RELIABILITY_RELIABLE; - + pub_opts.reliability = Z_RELIABILITY_RELIABLE; if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { - opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + pub_opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - auto pub = session->declare_publisher(pub_ke, std::move(opts), &result); + adv_pub_opts.publisher_options = pub_opts; + + zenoh::ZResult result; + auto adv_pub = session->ext().declare_advanced_publisher( + pub_ke, std::move(adv_pub_opts), &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } if (result != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); @@ -165,8 +160,7 @@ std::shared_ptr PublisherData::make( node, std::move(entity), std::move(session), - std::move(pub), - std::move(pub_cache), + std::move(adv_pub), std::move(token), type_support->data, std::move(message_type_support) @@ -179,8 +173,7 @@ PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr sess, - zenoh::Publisher pub, - std::optional pub_cache, + zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) @@ -189,7 +182,6 @@ PublisherData::PublisherData( entity_(std::move(entity)), sess_(std::move(sess)), pub_(std::move(pub)), - pub_cache_(std::move(pub_cache)), token_(std::move(token)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), @@ -253,8 +245,8 @@ rmw_ret_t PublisherData::publish( // will be encoded with CDR so it does not really matter. zenoh::ZResult result; int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); - auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = rmw_zenoh_cpp::AttachmentData( + auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf @@ -265,7 +257,7 @@ rmw_ret_t PublisherData::publish( TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp); - pub_.put(std::move(payload), std::move(options), &result); + pub_.put(std::move(payload), std::move(opts), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( @@ -301,8 +293,8 @@ rmw_ret_t PublisherData::publish_serialized_message( // will be encoded with CDR so it does not really matter. zenoh::ZResult result; int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); - auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = rmw_zenoh_cpp::AttachmentData( + auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); std::vector raw_data( @@ -312,7 +304,7 @@ rmw_ret_t PublisherData::publish_serialized_message( TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), serialized_message, source_timestamp); - pub_.put(std::move(payload), std::move(options), &result); + pub_.put(std::move(payload), std::move(opts), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 7cec5406..ee3d2e13 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -95,8 +95,7 @@ class PublisherData final const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr session, - zenoh::Publisher pub, - std::optional pub_cache, + zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -111,10 +110,8 @@ class PublisherData final std::shared_ptr entity_; // A shared session. std::shared_ptr sess_; - // An owned publisher. - zenoh::Publisher pub_; - // Optional publication cache when durability is transient_local. - std::optional pub_cache_; + // An owned AdvancedPublisher. + zenoh::ext::AdvancedPublisher pub_; // Liveliness token for the publisher. std::optional token_; // Type support fields diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 2a070ee8..8bce00d8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -173,151 +173,59 @@ bool SubscriptionData::init() sess_ = context_impl->session(); + auto adv_sub_opts = zenoh::ext::SessionExt::AdvancedSubscriberOptions::create_default(); + adv_sub_opts.subscriber_detection = true; + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - zenoh::ext::SessionExt::QueryingSubscriberOptions sub_options = - zenoh::ext::SessionExt::QueryingSubscriberOptions::create_default(); - const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - zenoh::KeyExpr selector_ke(selector); - sub_options.query_keyexpr = std::move(selector_ke); - // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. - // By default a query accepts only replies that matches its query selector. - // This allows us to selectively query certain PublicationCaches when defining the - // set_querying_subscriber_callback below. - sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY; - // As this initial query is now using a "*", the query target is not COMPLETE. - sub_options.query_target = Z_QUERY_TARGET_ALL; - // We set consolidation to none as we need to receive transient local messages - // from a number of publishers. Eg: To receive TF data published over /tf_static - // by various publishers. - sub_options.query_consolidation = - zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); - - std::weak_ptr data_wp = shared_from_this(); - auto sub = context_impl->session()->ext().declare_querying_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - std::string(sample.get_keyexpr().as_string_view()).c_str()); - return; - } - - auto attachment = sample.get_attachment(); - if (!attachment.has_value()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") - return; - } - - auto attachment_value = attachment.value(); - AttachmentData attachment_data(attachment_value); - - sub_data->add_new_message( - std::make_unique( - sample.get_payload(), - get_system_time_in_ns(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return false; - } - sub_ = std::move(sub); - - // Register the querying subscriber with the graph cache to get latest - // messages from publishers that were discovered after their first publication. - graph_cache_->set_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash(), - [data_wp](const std::string & queryable_prefix) -> void - { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); - return; - } - std::lock_guard lock(sub_data->mutex_); - - const std::string selector = queryable_prefix + - "/" + - sub_data->entity_->topic_info().value().topic_keyexpr_; - RMW_ZENOH_LOG_DEBUG_NAMED( + adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); + adv_sub_opts.history = zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); + adv_sub_opts.history->detect_late_publishers = true; + adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; + adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{}; + adv_sub_opts.recovery->periodic_queries_period_ms = 1000; + } + + std::weak_ptr data_wp = shared_from_this(); + auto on_sample = [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "QueryingSubscriberCallback triggered over %s.", - selector.c_str() + "SubscriberCallback triggered over %s.", + std::string(sample.get_keyexpr().as_string_view()).c_str() ); - zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); - opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; - opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - - zenoh::ZResult result; - std::get>(sub_data->sub_.value()).get( - zenoh::KeyExpr(selector), - std::move(opts), - &result); - - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to get querying subscriber."); - return; - } + return; } - ); - } else { - zenoh::Session::SubscriberOptions sub_options = - zenoh::Session::SubscriberOptions::create_default(); - std::weak_ptr data_wp = shared_from_this(); - zenoh::Subscriber sub = context_impl->session()->declare_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); - return; - } - auto attachment = sample.get_attachment(); - if (!attachment.has_value()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") - return; - } - auto attachment_value = attachment.value(); - - AttachmentData attachment_data(attachment_value); - sub_data->add_new_message( - std::make_unique( - sample.get_payload(), - get_system_time_in_ns(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return false; - } - sub_ = std::move(sub); + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + auto attachment_value = attachment.value(); + + AttachmentData attachment_data(attachment_value); + sub_data->add_new_message( + std::make_unique( + sample.get_payload(), + get_system_time_in_ns(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }; + sub_ = context_impl->session()->ext().declare_advanced_subscriber( + sub_ke, + on_sample, + zenoh::closures::none, + std::move(adv_sub_opts), + &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return false; } // Publish to the graph that a new subscription is in town. @@ -391,11 +299,6 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } - // Remove the registered callback from the GraphCache if any. - graph_cache_->remove_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash() - ); // Remove any event callbacks registered to this subscription. graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); @@ -410,27 +313,12 @@ rmw_ret_t SubscriptionData::shutdown() } if (sub_.has_value()) { - zenoh::Subscriber * sub = std::get_if>(&sub_.value()); - if (sub != nullptr) { - std::move(*sub).undeclare(&result); - if (result != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare sub."); - return RMW_RET_ERROR; - } - } else { - zenoh::ext::QueryingSubscriber * sub = - std::get_if>(&sub_.value()); - if (sub != nullptr) { - std::move(*sub).undeclare(&result); - if (result != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare querying sub."); - return RMW_RET_ERROR; - } - } + std::move(sub_.value()).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare sub."); + return RMW_RET_ERROR; } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 1da582ab..06d1b9fa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -138,8 +138,8 @@ class SubscriptionData final : public std::enable_shared_from_this entity_; // A shared session std::shared_ptr sess_; - // An owned subscriber or querying_subscriber depending on the QoS settings. - std::optional, zenoh::ext::QueryingSubscriber>> sub_; + // An owned advanced subscriber. + std::optional> sub_; // Liveliness token for the subscription. std::optional token_; // Type support fields From ab081f2d3c1773186b064dc161db8cea3f2b3ca9 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 31 Dec 2024 19:13:42 +0800 Subject: [PATCH 2/8] chore: ament_uncrustify and ament_cpplint --- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 3 ++- rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index a17bc07d..44acbd15 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -113,7 +113,8 @@ std::shared_ptr PublisherData::make( // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - adv_pub_opts.cache = zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); + adv_pub_opts.cache = + zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 8bce00d8..0e23d3da 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -182,7 +182,8 @@ bool SubscriptionData::init() // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); - adv_sub_opts.history = zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); + adv_sub_opts.history = + zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); adv_sub_opts.history->detect_late_publishers = true; adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{}; @@ -216,7 +217,7 @@ bool SubscriptionData::init() get_system_time_in_ns(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); - }; + }; sub_ = context_impl->session()->ext().declare_advanced_subscriber( sub_ke, on_sample, From 05228d09c27a82dfbf42b23d38ee692f1a08c4b5 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Tue, 7 Jan 2025 16:48:06 +0800 Subject: [PATCH 3/8] deps(zenoh): bump up zenoh-c and zenoh-cpp to include the fix of undeclaration deadlock --- zenoh_cpp_vendor/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index 57c45fbb..f1ad5043 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -22,7 +22,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh/pull/1696 (Fix SHM Garbage Collection (GC) policy) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 61d8fcc136ce4ed36d921a32244da4f3d81a6097 + VCS_VERSION 85ca060fa4037239ca4102a3a61f96626cc6b434 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -36,7 +36,7 @@ ament_export_dependencies(zenohc) # - https://github.com/eclipse-zenoh/zenoh-cpp/pull/363 (Fix memory leak in string deserialization) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 05942637c29d3346ad18bab5a178aeebf4be5d62 + VCS_VERSION 8dce692942246448a9e64f3bae68793634792c33 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF ) From 0b51b3fe81bcab233d21450a62189173880b63d9 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Wed, 8 Jan 2025 23:03:00 +0800 Subject: [PATCH 4/8] deps(zenoh): bumup zenoh-cpp to include the fix to the memory leak https://github.com/eclipse-zenoh/zenoh-cpp/pull/363 --- zenoh_cpp_vendor/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index f1ad5043..e7ba14db 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -36,7 +36,7 @@ ament_export_dependencies(zenohc) # - https://github.com/eclipse-zenoh/zenoh-cpp/pull/363 (Fix memory leak in string deserialization) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 8dce692942246448a9e64f3bae68793634792c33 + VCS_VERSION 9ca8cc170e6880de65e3efe17699617299ce42f6 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF ) From 25c6d5518cdef818c4e4345269ea7f7ed209685a Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 9 Jan 2025 23:25:09 +0800 Subject: [PATCH 5/8] trigger CI From a31fd16e2ff0d624b7a59145358d210b69974b16 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 13 Jan 2025 16:08:50 +0800 Subject: [PATCH 6/8] fix: resolve out-of-sync --- zenoh_cpp_vendor/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index e7ba14db..57c45fbb 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -22,7 +22,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh/pull/1696 (Fix SHM Garbage Collection (GC) policy) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 85ca060fa4037239ca4102a3a61f96626cc6b434 + VCS_VERSION 61d8fcc136ce4ed36d921a32244da4f3d81a6097 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -36,7 +36,7 @@ ament_export_dependencies(zenohc) # - https://github.com/eclipse-zenoh/zenoh-cpp/pull/363 (Fix memory leak in string deserialization) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 9ca8cc170e6880de65e3efe17699617299ce42f6 + VCS_VERSION 05942637c29d3346ad18bab5a178aeebf4be5d62 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF ) From 448bb6c975b99c46ce911e5128dcf12519c4435c Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 13 Jan 2025 16:28:03 +0800 Subject: [PATCH 7/8] refactor: remove the unneeded querying callbacks --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 14 -------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 5 ----- 2 files changed, 19 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index f2f43034..cd4c13d0 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -384,20 +384,6 @@ void GraphCache::parse_put( // Otherwise, the entity represents a node that already exists in the graph. // Update topic info if required below. update_topic_maps_for_put(node_it->second, entity); - - // If the newly added entity is a publisher with transient_local qos durability, - // we trigger any registered querying subscriber callbacks. - if (entity->type() == liveliness::EntityType::Publisher && - entity->topic_info().has_value() && - entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) - { - auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_); - if (sub_cbs_it != querying_subs_cbs_.end()) { - for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) { - sub_it->second(entity->zid()); - } - } - } } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index b6bfd059..f808e5e8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -104,8 +104,6 @@ class GraphCache final /// @brief Signature for a function that will be invoked by the GraphCache when a QoS /// event is detected. using GraphCacheEventCallback = std::function; - /// Callback to be triggered when a publication cache is detected in the ROS Graph. - using QueryingSubscriberCallback = std::function; /// @brief Constructor /// @param id The id of the zenoh session that is building the graph cache. @@ -278,9 +276,6 @@ class GraphCache final using GraphEventCallbackMap = std::unordered_map; // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; - // Map key expressions to a map of sub keyexpr_hash and QueryingSubscriberCallback. - std::unordered_map> querying_subs_cbs_; std::mutex events_mutex_; // Mutex to lock before modifying the members above. From 9cc776bd8cc7fddfea8dec0ad703a22d3a6c96d3 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 13 Jan 2025 16:28:40 +0800 Subject: [PATCH 8/8] fix: reorder the advanced pub/sub options to match the ROS2 QoS --- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 6 ++++-- rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 44acbd15..573e846a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -108,11 +108,13 @@ std::shared_ptr PublisherData::make( } auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default(); - adv_pub_opts.publisher_detection = true; - adv_pub_opts.sample_miss_detection = true; // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + // Retransmission can only be done if history is enabled on subscriber side. + adv_pub_opts.publisher_detection = true; + // Allow this publisher to be detected through liveliness. + adv_pub_opts.sample_miss_detection = true; adv_pub_opts.cache = zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 0e23d3da..ae3b4c81 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -174,14 +174,16 @@ bool SubscriptionData::init() sess_ = context_impl->session(); auto adv_sub_opts = zenoh::ext::SessionExt::AdvancedSubscriberOptions::create_default(); - adv_sub_opts.subscriber_detection = true; // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + // Allow this subscriber to be detected through liveliness. + adv_sub_opts.subscriber_detection = true; adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); + // History can only be retransmitted by Publishers that enable caching. adv_sub_opts.history = zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); adv_sub_opts.history->detect_late_publishers = true;