diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index abadd967..cd4c13d0 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -384,20 +384,6 @@ void GraphCache::parse_put( // Otherwise, the entity represents a node that already exists in the graph. // Update topic info if required below. update_topic_maps_for_put(node_it->second, entity); - - // If the newly added entity is a publisher with transient_local qos durability, - // we trigger any registered querying subscriber callbacks. - if (entity->type() == liveliness::EntityType::Publisher && - entity->topic_info().has_value() && - entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) - { - auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_); - if (sub_cbs_it != querying_subs_cbs_.end()) { - for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) { - sub_it->second(entity->zid()); - } - } - } } ///============================================================================= @@ -1253,38 +1239,4 @@ void GraphCache::update_event_counters( } } } - -///============================================================================= -void GraphCache::set_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyxpr_hash, - QueryingSubscriberCallback cb) -{ - std::unordered_map< - std::string, - std::unordered_map - >::iterator cb_it = querying_subs_cbs_.find(sub_keyexpr); - if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[sub_keyexpr] = - std::unordered_map{}; - cb_it = querying_subs_cbs_.find(sub_keyexpr); - } - cb_it->second.insert(std::make_pair(sub_keyxpr_hash, std::move(cb))); -} - -///============================================================================= -void GraphCache::remove_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash) -{ - std::unordered_map< - std::string, - std::unordered_map - >::iterator cb_map_it = querying_subs_cbs_.find(sub_keyexpr); - if (cb_map_it == querying_subs_cbs_.end()) { - return; - } - cb_map_it->second.erase(sub_keyexpr_hash); -} - } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 22fffe20..f808e5e8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -104,8 +104,6 @@ class GraphCache final /// @brief Signature for a function that will be invoked by the GraphCache when a QoS /// event is detected. using GraphCacheEventCallback = std::function; - /// Callback to be triggered when a publication cache is detected in the ROS Graph. - using QueryingSubscriberCallback = std::function; /// @brief Constructor /// @param id The id of the zenoh session that is building the graph cache. @@ -189,15 +187,6 @@ class GraphCache final /// Returns true if the entity is a publisher or client. False otherwise. static bool is_entity_pub(const liveliness::Entity & entity); - void set_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash, - QueryingSubscriberCallback cb); - - void remove_querying_subscriber_callback( - const std::string & sub_keyexpr, - const std::size_t sub_keyexpr_hash); - private: // Helper function to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. @@ -287,9 +276,6 @@ class GraphCache final using GraphEventCallbackMap = std::unordered_map; // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; - // Map key expressions to a map of sub keyexpr_hash and QueryingSubscriberCallback. - std::unordered_map> querying_subs_cbs_; std::mutex events_mutex_; // Mutex to lock before modifying the members above. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f960b879..573e846a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -107,40 +107,38 @@ std::shared_ptr PublisherData::make( return nullptr; } - zenoh::ZResult result; - std::optional pub_cache; - zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); + auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default(); + // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - zenoh::ext::SessionExt::PublicationCacheOptions pub_cache_opts = - zenoh::ext::SessionExt::PublicationCacheOptions::create_default(); - - pub_cache_opts.history = adapted_qos_profile.depth; - pub_cache_opts.queryable_complete = true; - - std::string queryable_prefix = entity->zid(); - pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); - - pub_cache = session->ext().declare_publication_cache( - pub_ke, std::move(pub_cache_opts), &result); - - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); - return nullptr; - } + // Retransmission can only be done if history is enabled on subscriber side. + adv_pub_opts.publisher_detection = true; + // Allow this publisher to be detected through liveliness. + adv_pub_opts.sample_miss_detection = true; + adv_pub_opts.cache = + zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); + adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; } + zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Set congestion_control to BLOCK if appropriate. - zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + auto pub_opts = zenoh::Session::PublisherOptions::create_default(); + pub_opts.congestion_control = Z_CONGESTION_CONTROL_DROP; if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - opts.reliability = Z_RELIABILITY_RELIABLE; - + pub_opts.reliability = Z_RELIABILITY_RELIABLE; if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { - opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + pub_opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - auto pub = session->declare_publisher(pub_ke, std::move(opts), &result); + adv_pub_opts.publisher_options = pub_opts; + + zenoh::ZResult result; + auto adv_pub = session->ext().declare_advanced_publisher( + pub_ke, std::move(adv_pub_opts), &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } if (result != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); @@ -165,8 +163,7 @@ std::shared_ptr PublisherData::make( node, std::move(entity), std::move(session), - std::move(pub), - std::move(pub_cache), + std::move(adv_pub), std::move(token), type_support->data, std::move(message_type_support) @@ -179,8 +176,7 @@ PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr sess, - zenoh::Publisher pub, - std::optional pub_cache, + zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) @@ -189,7 +185,6 @@ PublisherData::PublisherData( entity_(std::move(entity)), sess_(std::move(sess)), pub_(std::move(pub)), - pub_cache_(std::move(pub_cache)), token_(std::move(token)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), @@ -253,8 +248,8 @@ rmw_ret_t PublisherData::publish( // will be encoded with CDR so it does not really matter. zenoh::ZResult result; int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); - auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = rmw_zenoh_cpp::AttachmentData( + auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf @@ -265,7 +260,7 @@ rmw_ret_t PublisherData::publish( TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp); - pub_.put(std::move(payload), std::move(options), &result); + pub_.put(std::move(payload), std::move(opts), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( @@ -301,8 +296,8 @@ rmw_ret_t PublisherData::publish_serialized_message( // will be encoded with CDR so it does not really matter. zenoh::ZResult result; int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); - auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = rmw_zenoh_cpp::AttachmentData( + auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); std::vector raw_data( @@ -312,7 +307,7 @@ rmw_ret_t PublisherData::publish_serialized_message( TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), serialized_message, source_timestamp); - pub_.put(std::move(payload), std::move(options), &result); + pub_.put(std::move(payload), std::move(opts), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 7cec5406..ee3d2e13 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -95,8 +95,7 @@ class PublisherData final const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr session, - zenoh::Publisher pub, - std::optional pub_cache, + zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -111,10 +110,8 @@ class PublisherData final std::shared_ptr entity_; // A shared session. std::shared_ptr sess_; - // An owned publisher. - zenoh::Publisher pub_; - // Optional publication cache when durability is transient_local. - std::optional pub_cache_; + // An owned AdvancedPublisher. + zenoh::ext::AdvancedPublisher pub_; // Liveliness token for the publisher. std::optional token_; // Type support fields diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 2a070ee8..ae3b4c81 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -173,151 +173,62 @@ bool SubscriptionData::init() sess_ = context_impl->session(); + auto adv_sub_opts = zenoh::ext::SessionExt::AdvancedSubscriberOptions::create_default(); + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - zenoh::ext::SessionExt::QueryingSubscriberOptions sub_options = - zenoh::ext::SessionExt::QueryingSubscriberOptions::create_default(); - const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - zenoh::KeyExpr selector_ke(selector); - sub_options.query_keyexpr = std::move(selector_ke); - // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. - // By default a query accepts only replies that matches its query selector. - // This allows us to selectively query certain PublicationCaches when defining the - // set_querying_subscriber_callback below. - sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY; - // As this initial query is now using a "*", the query target is not COMPLETE. - sub_options.query_target = Z_QUERY_TARGET_ALL; - // We set consolidation to none as we need to receive transient local messages - // from a number of publishers. Eg: To receive TF data published over /tf_static - // by various publishers. - sub_options.query_consolidation = - zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); - - std::weak_ptr data_wp = shared_from_this(); - auto sub = context_impl->session()->ext().declare_querying_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - std::string(sample.get_keyexpr().as_string_view()).c_str()); - return; - } - - auto attachment = sample.get_attachment(); - if (!attachment.has_value()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") - return; - } - - auto attachment_value = attachment.value(); - AttachmentData attachment_data(attachment_value); - - sub_data->add_new_message( - std::make_unique( - sample.get_payload(), - get_system_time_in_ns(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return false; - } - sub_ = std::move(sub); - - // Register the querying subscriber with the graph cache to get latest - // messages from publishers that were discovered after their first publication. - graph_cache_->set_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash(), - [data_wp](const std::string & queryable_prefix) -> void - { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); - return; - } - std::lock_guard lock(sub_data->mutex_); - - const std::string selector = queryable_prefix + - "/" + - sub_data->entity_->topic_info().value().topic_keyexpr_; - RMW_ZENOH_LOG_DEBUG_NAMED( + // Allow this subscriber to be detected through liveliness. + adv_sub_opts.subscriber_detection = true; + adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); + // History can only be retransmitted by Publishers that enable caching. + adv_sub_opts.history = + zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); + adv_sub_opts.history->detect_late_publishers = true; + adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; + adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{}; + adv_sub_opts.recovery->periodic_queries_period_ms = 1000; + } + + std::weak_ptr data_wp = shared_from_this(); + auto on_sample = [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "QueryingSubscriberCallback triggered over %s.", - selector.c_str() + "SubscriberCallback triggered over %s.", + std::string(sample.get_keyexpr().as_string_view()).c_str() ); - zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); - opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; - opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - - zenoh::ZResult result; - std::get>(sub_data->sub_.value()).get( - zenoh::KeyExpr(selector), - std::move(opts), - &result); - - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to get querying subscriber."); - return; - } + return; } - ); - } else { - zenoh::Session::SubscriberOptions sub_options = - zenoh::Session::SubscriberOptions::create_default(); - std::weak_ptr data_wp = shared_from_this(); - zenoh::Subscriber sub = context_impl->session()->declare_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); - return; - } - auto attachment = sample.get_attachment(); - if (!attachment.has_value()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") - return; - } - auto attachment_value = attachment.value(); - - AttachmentData attachment_data(attachment_value); - sub_data->add_new_message( - std::make_unique( - sample.get_payload(), - get_system_time_in_ns(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return false; - } - sub_ = std::move(sub); + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + auto attachment_value = attachment.value(); + + AttachmentData attachment_data(attachment_value); + sub_data->add_new_message( + std::make_unique( + sample.get_payload(), + get_system_time_in_ns(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }; + sub_ = context_impl->session()->ext().declare_advanced_subscriber( + sub_ke, + on_sample, + zenoh::closures::none, + std::move(adv_sub_opts), + &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return false; } // Publish to the graph that a new subscription is in town. @@ -391,11 +302,6 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } - // Remove the registered callback from the GraphCache if any. - graph_cache_->remove_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash() - ); // Remove any event callbacks registered to this subscription. graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); @@ -410,27 +316,12 @@ rmw_ret_t SubscriptionData::shutdown() } if (sub_.has_value()) { - zenoh::Subscriber * sub = std::get_if>(&sub_.value()); - if (sub != nullptr) { - std::move(*sub).undeclare(&result); - if (result != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare sub."); - return RMW_RET_ERROR; - } - } else { - zenoh::ext::QueryingSubscriber * sub = - std::get_if>(&sub_.value()); - if (sub != nullptr) { - std::move(*sub).undeclare(&result); - if (result != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare querying sub."); - return RMW_RET_ERROR; - } - } + std::move(sub_.value()).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare sub."); + return RMW_RET_ERROR; } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 1da582ab..06d1b9fa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -138,8 +138,8 @@ class SubscriptionData final : public std::enable_shared_from_this entity_; // A shared session std::shared_ptr sess_; - // An owned subscriber or querying_subscriber depending on the QoS settings. - std::optional, zenoh::ext::QueryingSubscriber>> sub_; + // An owned advanced subscriber. + std::optional> sub_; // Liveliness token for the subscription. std::optional token_; // Type support fields