From 565b97b0108a02ef41284629a6d226c053c0dd7e Mon Sep 17 00:00:00 2001 From: Juergen Gehring Date: Thu, 25 Jan 2018 00:40:08 -0800 Subject: [PATCH] vsomeip 2.10.8 --- CHANGES | 17 + CMakeLists.txt | 2 +- documentation/vsomeipUserGuide | 2 + .../configuration/include/policy.hpp | 6 + .../configuration/src/configuration_impl.cpp | 25 +- .../include/client_endpoint_impl.hpp | 7 +- .../endpoints/src/client_endpoint_impl.cpp | 20 +- .../src/local_client_endpoint_impl.cpp | 6 +- .../src/tcp_client_endpoint_impl.cpp | 10 +- .../src/udp_client_endpoint_impl.cpp | 7 +- .../routing/src/routing_manager_impl.cpp | 32 +- .../routing/src/routing_manager_proxy.cpp | 15 +- .../runtime/include/application_impl.hpp | 9 +- .../runtime/src/application_impl.cpp | 127 +++- .../include/service_discovery_impl.hpp | 33 +- .../src/service_discovery_impl.cpp | 681 +++++++++++------- .../subscribe_notify_one_test_service.cpp | 4 - .../subscribe_notify_test_service.cpp | 4 - 18 files changed, 658 insertions(+), 349 deletions(-) diff --git a/CHANGES b/CHANGES index 8f31b84a4..3e6bc6e35 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,23 @@ Changes ======= +v2.10.8 +- Change dispatching of availability states in case an availability + handler of a service instance is blocked in user code: Availability + states of a service instance are now never dispatched parallel. The + next availability state for a service instance is only dispatched + after the blocked availability handler returned from user code. If + the availability of the service instance changes in the meantime, + subsequent incoming messages of the service instance are queued + until the availability change was reported to the user code. +- Subscriptions to remotely offered services are now always done based + on the protocol(s) the remote service is offered with. The + subscription_type parameter of the application::subscribe method is + ignored. +- Added wildcard support ("any") for the uid and gid json parameters + in the security configuration. +- Fix possible deadlock on application shutdown + v2.10.7 - Fix potential deadlock when expiring remote subscriptions - Rework restarting of tcp client endpoints to prevent heap corruption diff --git a/CMakeLists.txt b/CMakeLists.txt index cce0a681e..281530248 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 10) -set (VSOMEIP_PATCH_VERSION 7) +set (VSOMEIP_PATCH_VERSION 8) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) diff --git a/documentation/vsomeipUserGuide b/documentation/vsomeipUserGuide index 083669e21..395fdd573 100644 --- a/documentation/vsomeipUserGuide +++ b/documentation/vsomeipUserGuide @@ -1035,10 +1035,12 @@ For remote clients this entry should be skipped. **** 'uid' + Specifies the LINUX user id of the above client(s) as decimal number. +As a wildcard "any" can be used. **** 'gid' + Specifies the LINUX group id of the above client(s) as decimal number. +As a wildcard "any" can be used. *** 'allow/deny' + diff --git a/implementation/configuration/include/policy.hpp b/implementation/configuration/include/policy.hpp index e94d7b56e..89e8a9637 100644 --- a/implementation/configuration/include/policy.hpp +++ b/implementation/configuration/include/policy.hpp @@ -14,12 +14,18 @@ namespace vsomeip { namespace cfg { struct policy { + policy() : + uid_(0), is_uid_set_(false), gid_(0), is_gid_set_(false) { + } + std::set> allowed_services_; std::set> allowed_offers_; std::set> denied_services_; std::set> denied_offers_; std::uint32_t uid_; + bool is_uid_set_; std::uint32_t gid_; + bool is_gid_set_; bool allow_; }; diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 0920b66c5..178481c5b 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -1667,7 +1667,7 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) { } } if (overrides) { - VSOMEIP_WARNING << std::hex << "Security configuration: " + VSOMEIP_INFO << std::hex << "Security configuration: " << "Client range 0x" << firstClient << " - 0x" << lastClient << " overrides policy of " << std::dec << overrides << " clients"; @@ -1684,31 +1684,33 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) { its_converter >> client; if (client != 0x0) { if (policies_.find(client) != policies_.end()) { - VSOMEIP_WARNING << std::hex << "Security configuration: " + VSOMEIP_INFO << std::hex << "Security configuration: " << "Overriding policy for client 0x" << client << "."; } policies_[client] = policy; } } } else if (i->first == "credentials") { - uint32_t uid = 0x0; - uint32_t gid = 0x0; for (auto n = i->second.begin(); n != i->second.end(); ++n) { if (n->first == "uid") { std::stringstream its_converter; std::string value = n->second.data(); - its_converter << std::dec << value; - its_converter >> uid; + if (value != "any") { + its_converter << std::dec << value; + its_converter >> policy->uid_ ; + policy->is_uid_set_ = true; + } } else if (n->first == "gid") { std::stringstream its_converter; std::string value = n->second.data(); - its_converter << std::dec << value; - its_converter >> gid; + if (value != "any") { + its_converter << std::dec << value; + its_converter >> policy->gid_ ; + policy->is_gid_set_ = true; + } } } - policy->uid_ = uid; - policy->gid_ = gid; } else if (i->first == "allow") { if (allow_deny_set) { VSOMEIP_WARNING << "Security configuration: \"allow\" tag overrides " @@ -2410,7 +2412,8 @@ bool configuration_impl::check_credentials(client_t _client, uint32_t _uid, } auto its_client = policies_.find(_client); if (its_client != policies_.end()) { - if (its_client->second->uid_ == _uid && its_client->second->gid_ == _gid) { + if ((!its_client->second->is_uid_set_ || its_client->second->uid_ == _uid) + && (!its_client->second->is_gid_set_ || its_client->second->gid_ == _gid)) { return true; } } diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index d8c758dbf..b72ea7e22 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -70,6 +70,11 @@ class client_endpoint_impl: public endpoint_impl, public client_endpoi virtual void print_status() = 0; protected: + enum class cei_state_e : std::uint8_t { + CLOSED, + CONNECTING, + ESTABLISHED + }; virtual void send_queued() = 0; void shutdown_and_close_socket(bool _recreate_socket); void shutdown_and_close_socket_unlocked(bool _recreate_socket); @@ -84,7 +89,7 @@ class client_endpoint_impl: public endpoint_impl, public client_endpoi std::mutex connect_timer_mutex_; boost::asio::steady_timer connect_timer_; std::atomic connect_timeout_; - std::atomic is_connected_; + std::atomic state_; // send data message_buffer_ptr_t packetizer_; diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 55316a9ee..be8e68476 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -37,7 +37,7 @@ client_endpoint_impl::client_endpoint_impl( socket_(new socket_type(_io)), remote_(_remote), flush_timer_(_io), connect_timer_(_io), connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable - is_connected_(false), + state_(cei_state_e::CLOSED), packetizer_(std::make_shared()), queue_size_(0), was_not_connected_(false), @@ -55,11 +55,16 @@ bool client_endpoint_impl::is_client() const { template bool client_endpoint_impl::is_connected() const { - return is_connected_; + return state_ == cei_state_e::ESTABLISHED; } template -void client_endpoint_impl::set_connected(bool _connected) { is_connected_ = _connected; } +void client_endpoint_impl::set_connected(bool _connected) { + if (_connected) { + state_ = cei_state_e::ESTABLISHED; + } else { + state_ = cei_state_e::CLOSED; + } } template void client_endpoint_impl::stop() { { std::lock_guard its_lock(mutex_); @@ -222,8 +227,8 @@ void client_endpoint_impl::connect_cbk( if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT) connect_timeout_ = (connect_timeout_ << 1); - if (is_connected_) { - is_connected_ = false; + if (state_ != cei_state_e::ESTABLISHED) { + state_ = cei_state_e::CLOSED; its_host->on_disconnect(this->shared_from_this()); } } else { @@ -233,7 +238,7 @@ void client_endpoint_impl::connect_cbk( } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable set_local_port(); - if (!is_connected_) { + if (state_ != cei_state_e::ESTABLISHED) { its_host->on_connect(this->shared_from_this()); } @@ -270,7 +275,7 @@ void client_endpoint_impl::send_cbk( send_queued(); } } else if (_error == boost::asio::error::broken_pipe) { - is_connected_ = false; + state_ = cei_state_e::CLOSED; bool stopping(false); { std::lock_guard its_lock(mutex_); @@ -319,6 +324,7 @@ void client_endpoint_impl::send_cbk( connect(); } else if (_error == boost::asio::error::not_connected || _error == boost::asio::error::bad_descriptor) { + state_ = cei_state_e::CLOSED; was_not_connected_ = true; shutdown_and_close_socket(true); connect(); diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 009ac9f08..b84cb0aab 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -46,7 +46,10 @@ bool local_client_endpoint_impl::is_local() const { } void local_client_endpoint_impl::restart() { - is_connected_ = false; + if (state_ == cei_state_e::CONNECTING) { + return; + } + state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); sending_blocked_ = false; @@ -113,6 +116,7 @@ void local_client_endpoint_impl::connect() { VSOMEIP_WARNING << "local_client_endpoint_impl::connect: " << "couldn't enable SO_REUSEADDR: " << its_error.message(); } + state_ = cei_state_e::CONNECTING; socket_->connect(remote_, its_connect_error); // Credentials diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 143b95aa7..b1a2f7416 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -61,7 +61,10 @@ void tcp_client_endpoint_impl::start() { } void tcp_client_endpoint_impl::restart() { - is_connected_ = false; + if (state_ == cei_state_e::CONNECTING) { + return; + } + state_ = cei_state_e::CONNECTING; std::string address_port_local; { std::lock_guard its_lock(socket_mutex_); @@ -137,7 +140,7 @@ void tcp_client_endpoint_impl::connect() { "Error binding socket: " << its_bind_error.message(); } } - + state_ = cei_state_e::CONNECTING; socket_->async_connect( remote_, std::bind( @@ -180,7 +183,6 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, _recv_buffer->resize(its_required_capacity, 0x0); } buffer_size = _missing_capacity; - _missing_capacity = 0; } else if (buffer_shrink_threshold_ && shrink_count_ > buffer_shrink_threshold_ && _recv_buffer_size == 0) { @@ -517,7 +519,7 @@ void tcp_client_endpoint_impl::receive_cbk( << _error.message() << "( " << std::dec << _error.value() << ") local: " << get_address_port_local() << " remote: " << get_address_port_remote(); - is_connected_ = false; + state_ = cei_state_e::CLOSED; shutdown_and_close_socket_unlocked(false); } else { VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 6bc0bf599..6252b40de 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -51,7 +51,7 @@ void udp_client_endpoint_impl::connect() { "Error binding socket: " << its_bind_error.message(); } } - + state_ = cei_state_e::CONNECTING; socket_->async_connect( remote_, std::bind( @@ -77,7 +77,10 @@ void udp_client_endpoint_impl::start() { } void udp_client_endpoint_impl::restart() { - is_connected_ = false; + if (state_ == cei_state_e::CONNECTING) { + return; + } + state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); queue_.clear(); diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 0c443fda7..2acd4eb35 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -3204,32 +3204,14 @@ void routing_manager_impl::on_identify_response(client_t _client, service_t _ser void routing_manager_impl::identify_for_subscribe(client_t _client, service_t _service, instance_t _instance, major_version_t _major, subscription_type_e _subscription_type) { - if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE - || _subscription_type == subscription_type_e::SU_PREFER_UNRELIABLE - || _subscription_type == subscription_type_e::SU_UNRELIABLE) { - if (!has_identified(_client, _service, _instance, false) - && !is_identifying(_client, _service, _instance, false)) { - if (!send_identify_message(_client, _service, _instance, _major, - false) && _subscription_type - == subscription_type_e::SU_PREFER_UNRELIABLE) { - send_identify_message(_client, _service, _instance, _major, - true); - } - } + (void)_subscription_type; + if (!has_identified(_client, _service, _instance, false) + && !is_identifying(_client, _service, _instance, false)) { + send_identify_message(_client, _service, _instance, _major, false); } - - if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE - || _subscription_type == subscription_type_e::SU_PREFER_RELIABLE - || _subscription_type == subscription_type_e::SU_RELIABLE) { - if (!has_identified(_client, _service, _instance, true) - && !is_identifying(_client, _service, _instance, true)) { - if (!send_identify_message(_client, _service, _instance, _major, - true) && _subscription_type - == subscription_type_e::SU_PREFER_RELIABLE) { - send_identify_message(_client, _service, _instance, _major, - false); - } - } + if (!has_identified(_client, _service, _instance, true) + && !is_identifying(_client, _service, _instance, true)) { + send_identify_message(_client, _service, _instance, _major, true); } } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index a15ae2da8..f6aa420bc 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -363,12 +363,15 @@ void routing_manager_proxy::register_event(client_t _client, _event, _eventgroups, _is_field, _is_provided); } } - VSOMEIP_INFO << "REGISTER EVENT(" - << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _event - << ":is_provider=" << _is_provided << "]"; + + if(_is_provided) { + VSOMEIP_INFO << "REGISTER EVENT(" + << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _event + << ":is_provider=" << _is_provided << "]"; + } } void routing_manager_proxy::unregister_event(client_t _client, diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp index 2d6cd8221..efd7d7df6 100644 --- a/implementation/runtime/include/application_impl.hpp +++ b/implementation/runtime/include/application_impl.hpp @@ -203,7 +203,6 @@ class application_impl: public application, sync_handler(std::function _handler) : handler_(_handler), - is_dispatching_(false), service_id_(ANY_SERVICE), instance_id_(ANY_INSTANCE), method_id_(ANY_METHOD), @@ -215,7 +214,6 @@ class application_impl: public application, method_t _method_id, session_t _session_id, eventgroup_t _eventgroup_id, handler_type_e _handler_type) : handler_(nullptr), - is_dispatching_(false), service_id_(_service_id), instance_id_(_instance_id), method_id_(_method_id), @@ -224,7 +222,6 @@ class application_impl: public application, handler_type_(_handler_type) { } std::function handler_; - bool is_dispatching_; service_t service_id_; instance_t instance_id_; method_t method_id_; @@ -268,6 +265,8 @@ class application_impl: public application, void main_dispatch(); void dispatch(); void invoke_handler(std::shared_ptr &_handler); + std::shared_ptr get_next_handler(); + void reschedule_availability_handler(const std::shared_ptr &_handler); bool has_active_dispatcher(); bool is_active_dispatcher(const std::thread::id &_id); void remove_elapsed_dispatchers(); @@ -372,6 +371,7 @@ class application_impl: public application, std::set running_dispatchers_; // Mutex to protect access to dispatchers_ & elapsed_dispatchers_ std::mutex dispatcher_mutex_; + // Condition to wakeup the dispatcher thread mutable std::condition_variable dispatcher_condition_; std::size_t max_dispatchers_; @@ -419,6 +419,9 @@ class application_impl: public application, bool client_side_logging_; std::set > client_side_logging_filter_; + + std::map, + std::deque > > availability_handlers_; }; } // namespace vsomeip diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index d97be8e58..720ddb168 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -1557,13 +1557,18 @@ void application_impl::main_dispatch() { dispatcher_condition_.wait(its_lock); } } else { - while (is_dispatching_ && !handlers_.empty() && is_active_dispatcher(its_id)) { - std::shared_ptr its_handler = handlers_.front(); - handlers_.pop_front(); + std::shared_ptr its_handler; + while (is_dispatching_ && is_active_dispatcher(its_id) + && (its_handler = get_next_handler())) { its_lock.unlock(); invoke_handler(its_handler); + + if (!is_dispatching_) + return; + its_lock.lock(); + reschedule_availability_handler(its_handler); remove_elapsed_dispatchers(); #ifdef _WIN32 @@ -1596,14 +1601,18 @@ void application_impl::dispatch() { return; } } else { - while (is_dispatching_ && !handlers_.empty() - && is_active_dispatcher(its_id)) { - std::shared_ptr its_handler = handlers_.front(); - handlers_.pop_front(); + std::shared_ptr its_handler; + while (is_dispatching_ && is_active_dispatcher(its_id) + && (its_handler = get_next_handler())) { its_lock.unlock(); invoke_handler(its_handler); + + if (!is_dispatching_) + return; + its_lock.lock(); + reschedule_availability_handler(its_handler); remove_elapsed_dispatchers(); } } @@ -1616,6 +1625,72 @@ void application_impl::dispatch() { dispatcher_condition_.notify_all(); } +std::shared_ptr application_impl::get_next_handler() { + std::shared_ptr its_next_handler; + while (!handlers_.empty() && !its_next_handler) { + its_next_handler = handlers_.front(); + handlers_.pop_front(); + + // Check handler + if (its_next_handler->handler_type_ == handler_type_e::AVAILABILITY) { + const std::pair its_si_pair = std::make_pair( + its_next_handler->service_id_, + its_next_handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end() + && !found_si->second.empty() + && found_si->second.front() != its_next_handler) { + found_si->second.push_back(its_next_handler); + // There is a running availability handler for this service. + // Therefore, this one must wait... + its_next_handler = nullptr; + } else { + availability_handlers_[its_si_pair].push_back(its_next_handler); + } + } else if (its_next_handler->handler_type_ == handler_type_e::MESSAGE) { + const std::pair its_si_pair = std::make_pair( + its_next_handler->service_id_, + its_next_handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end() + && found_si->second.size() > 1) { + // The message comes after the next availability handler + // Therefore, queue it to the last one + found_si->second.push_back(its_next_handler); + its_next_handler = nullptr; + } + } + } + + return its_next_handler; +} + +void application_impl::reschedule_availability_handler( + const std::shared_ptr &_handler) { + if (_handler->handler_type_ == handler_type_e::AVAILABILITY) { + const std::pair its_si_pair = std::make_pair( + _handler->service_id_, _handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end()) { + if (!found_si->second.empty() + && found_si->second.front() == _handler) { + found_si->second.pop_front(); + + // If there are other availability handlers pending, schedule + // them and all handlers that were queued because of them + for (auto it = found_si->second.rbegin(); + it != found_si->second.rend(); it++) { + handlers_.push_front(*it); + } + availability_handlers_.erase(found_si); + } + return; + } + VSOMEIP_WARNING << __func__ + << ": An unknown availability handler returned!"; + } +} + void application_impl::invoke_handler(std::shared_ptr &_handler) { const std::thread::id its_id = std::this_thread::get_id(); @@ -1669,11 +1744,17 @@ void application_impl::invoke_handler(std::shared_ptr &_handler) { << "type=" << static_cast(its_sync_handler->handler_type_) << " thread=" << std::hex << its_id; } - if (is_dispatching_) { - { - std::lock_guard its_lock(dispatcher_mutex_); + + while (is_dispatching_ ) { + if (dispatcher_mutex_.try_lock()) { running_dispatchers_.insert(its_id); + dispatcher_mutex_.unlock(); + break; } + std::this_thread::yield(); + } + + if (is_dispatching_) { try { _handler->handler_(); } catch (const std::exception &e) { @@ -1684,9 +1765,14 @@ void application_impl::invoke_handler(std::shared_ptr &_handler) { } boost::system::error_code ec; its_dispatcher_timer.cancel(ec); - if (is_dispatching_) { - std::lock_guard its_lock(dispatcher_mutex_); - running_dispatchers_.erase(its_id); + + while (is_dispatching_ ) { + if (dispatcher_mutex_.try_lock()) { + running_dispatchers_.erase(its_id); + dispatcher_mutex_.unlock(); + return; + } + std::this_thread::yield(); } } @@ -1801,6 +1887,7 @@ void application_impl::shutdown() { its_dispatcher.second->detach(); } } + availability_handlers_.clear(); running_dispatchers_.clear(); elapsed_dispatchers_.clear(); dispatchers_.clear(); @@ -2033,13 +2120,13 @@ bool application_impl::check_subscription_state(service_t _service, instance_t _ void application_impl::print_blocking_call(std::shared_ptr _handler) { switch (_handler->handler_type_) { case handler_type_e::AVAILABILITY: - VSOMEIP_INFO << "BLOCKING CALL AVAILABILITY(" + VSOMEIP_WARNING << "BLOCKING CALL AVAILABILITY(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "]"; break; case handler_type_e::MESSAGE: - VSOMEIP_INFO << "BLOCKING CALL MESSAGE(" + VSOMEIP_WARNING << "BLOCKING CALL MESSAGE(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "." @@ -2047,11 +2134,11 @@ void application_impl::print_blocking_call(std::shared_ptr _handle << std::hex << std::setw(4) << std::setfill('0') << _handler->session_id_ << "]"; break; case handler_type_e::STATE: - VSOMEIP_INFO << "BLOCKING CALL STATE(" + VSOMEIP_WARNING << "BLOCKING CALL STATE(" << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")"; break; case handler_type_e::SUBSCRIPTION: - VSOMEIP_INFO << "BLOCKING CALL SUBSCRIPTION(" + VSOMEIP_WARNING << "BLOCKING CALL SUBSCRIPTION(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "." @@ -2059,15 +2146,15 @@ void application_impl::print_blocking_call(std::shared_ptr _handle << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << "]"; break; case handler_type_e::OFFERED_SERVICES_INFO: - VSOMEIP_INFO << "BLOCKING CALL OFFERED_SERVICES_INFO(" + VSOMEIP_WARNING << "BLOCKING CALL OFFERED_SERVICES_INFO(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")"; break; case handler_type_e::WATCHDOG: - VSOMEIP_INFO << "BLOCKING CALL WATCHDOG(" + VSOMEIP_WARNING << "BLOCKING CALL WATCHDOG(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")"; break; case handler_type_e::UNKNOWN: - VSOMEIP_INFO << "BLOCKING CALL UNKNOWN(" + VSOMEIP_WARNING << "BLOCKING CALL UNKNOWN(" << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")"; break; } diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 020de5933..039fe7bf6 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -122,12 +122,20 @@ class service_discovery_impl: public service_discovery, service_t _service, instance_t _instance, const std::shared_ptr &_info, uint32_t &_size); - void insert_subscription(std::shared_ptr &_message, + enum remote_offer_type_e : std::uint8_t { + RELIABLE_UNRELIABLE, + RELIABLE, + UNRELIABLE, + UNKNOWN = 0xff + }; + bool insert_subscription(std::shared_ptr &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr &_subscription, bool _insert_reliable, bool _insert_unreliable); - void insert_nack_subscription_on_resubscribe(std::shared_ptr &_message, + std::shared_ptr &_subscription, + remote_offer_type_e _offer_type); + bool insert_nack_subscription_on_resubscribe(std::shared_ptr &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr &_subscription); + std::shared_ptr &_subscription, + remote_offer_type_e _offer_type); void insert_subscription_ack(std::shared_ptr &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr &_info, ttl_t _ttl, @@ -206,8 +214,7 @@ class service_discovery_impl: public service_discovery, const std::shared_ptr &_message) const; bool check_layer_four_protocol( const std::shared_ptr _ip_option) const; - void get_subscription_endpoints(subscription_type_e _subscription_type, - std::shared_ptr& _unreliable, + void get_subscription_endpoints(std::shared_ptr& _unreliable, std::shared_ptr& _reliable, boost::asio::ip::address* _address, bool* _has_address, @@ -322,6 +329,16 @@ class service_discovery_impl: public service_discovery, void on_last_msg_received_timer_expired(const boost::system::error_code &_error); void stop_last_msg_received_timer(); + + remote_offer_type_e get_remote_offer_type(service_t _service, instance_t _instance); + bool update_remote_offer_type(service_t _service, instance_t _instance, + remote_offer_type_e _offer_type, + const boost::asio::ip::address &_reliable_address, + const boost::asio::ip::address &_unreliable_address); + void remove_remote_offer_type(service_t _service, instance_t _instance, + const boost::asio::ip::address &_address); + void remove_remote_offer_type_by_ip(const boost::asio::ip::address &_address); + private: boost::asio::io_service &io_; service_discovery_host *host_; @@ -417,6 +434,10 @@ class service_discovery_impl: public service_discovery, std::mutex last_msg_received_timer_mutex_; boost::asio::steady_timer last_msg_received_timer_; std::chrono::milliseconds last_msg_received_timer_timeout_; + + std::mutex remote_offer_types_mutex_; + std::map, remote_offer_type_e> remote_offer_types_; + std::map>> remote_offers_by_ip_; }; } // namespace sd diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index d617e0cd4..2d5aafff8 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -258,7 +258,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, bool has_address(false); boost::asio::ip::address its_address; - get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable, + get_subscription_endpoints(its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, _client); std::shared_ptr its_runtime = runtime_.lock(); @@ -286,41 +286,47 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } } - if (its_subscription->get_endpoint(true) && + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); + + if (its_offer_type == remote_offer_type_e::UNRELIABLE && + !its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(true)->is_connected() && - its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(false)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, true); + its_subscription, its_offer_type); } else { - if (!its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->set_tcp_connection_established(false); - } - if (!its_subscription->get_endpoint(false)->is_connected()) { - its_subscription->set_udp_connection_established(false); - } + its_subscription->set_udp_connection_established(false); } - } else if (its_subscription->get_endpoint(true) && + } else if (its_offer_type == remote_offer_type_e::RELIABLE && + its_subscription->get_endpoint(true) && !its_subscription->get_endpoint(false)) { if (its_subscription->get_endpoint(true)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, false); + its_subscription, its_offer_type); } else { its_subscription->set_tcp_connection_established(false); } - } else if (!its_subscription->get_endpoint(true) && + } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && + its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(true)->is_connected() && + its_subscription->get_endpoint(false)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, false, true); + its_subscription, its_offer_type); } else { - its_subscription->set_udp_connection_established(false); + if (!its_subscription->get_endpoint(true)->is_connected()) { + its_subscription->set_tcp_connection_established(false); + } + if (!its_subscription->get_endpoint(false)->is_connected()) { + its_subscription->set_udp_connection_established(false); + } } } @@ -336,107 +342,30 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } void service_discovery_impl::get_subscription_endpoints( - subscription_type_e _subscription_type, std::shared_ptr& _unreliable, std::shared_ptr& _reliable, boost::asio::ip::address* _address, bool* _has_address, service_t _service, instance_t _instance, client_t _client) const { - switch (_subscription_type) { - case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: - _reliable = host_->find_or_create_remote_client(_service, _instance, - true, _client); - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( + _reliable = host_->find_or_create_remote_client(_service, _instance, + true, _client); + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); + if (_unreliable) { + std::shared_ptr its_client_endpoint = + std::dynamic_pointer_cast(_unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } + } + if (_reliable) { + std::shared_ptr its_client_endpoint = + std::dynamic_pointer_cast(_reliable); + if (its_client_endpoint) { + *_has_address = *_has_address + || its_client_endpoint->get_remote_address( *_address); - } - } - if (_reliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_reliable); - if (its_client_endpoint) { - *_has_address = *_has_address - || its_client_endpoint->get_remote_address( - *_address); - } - } - break; - case subscription_type_e::SU_PREFER_UNRELIABLE: - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } else { - _reliable = host_->find_or_create_remote_client(_service, - _instance, true, _client); - if (_reliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast( - _reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - } - break; - case subscription_type_e::SU_PREFER_RELIABLE: - _reliable = host_->find_or_create_remote_client(_service, - _instance, true, _client); - if (_reliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } else { - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast( - _unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - } - break; - case subscription_type_e::SU_UNRELIABLE: - _unreliable = host_->find_or_create_remote_client(_service, - _instance, - false, _client); - if (_unreliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - break; - case subscription_type_e::SU_RELIABLE: - _reliable = host_->find_or_create_remote_client(_service, _instance, - true, _client); - if (_reliable) { - std::shared_ptr its_client_endpoint = - std::dynamic_pointer_cast(_reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } + } } } @@ -456,6 +385,8 @@ void service_discovery_impl::unsubscribe(service_t _service, if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto found_client = found_eventgroup->second.find(_client); @@ -488,8 +419,8 @@ void service_discovery_impl::unsubscribe(service_t _service, return; } } - insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, true); + insert_subscription(its_message, _service, _instance, + _eventgroup, its_subscription, its_offer_type); } } } @@ -535,6 +466,8 @@ void service_discovery_impl::unsubscribe_client(service_t _service, if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); for (auto &found_eventgroup : found_instance->second) { auto found_client = found_eventgroup.second.find(_client); if (found_client != found_eventgroup.second.end()) { @@ -570,8 +503,7 @@ void service_discovery_impl::unsubscribe_client(service_t _service, } } insert_subscription(its_message, _service, _instance, - found_eventgroup.first, its_subscription, true, - true); + found_eventgroup.first, its_subscription, its_offer_type); } } } @@ -895,117 +827,207 @@ void service_discovery_impl::insert_offer_entries( _done = true; } -void service_discovery_impl::insert_subscription( +bool service_discovery_impl::insert_subscription( std::shared_ptr &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr &_subscription, - bool _insert_reliable, bool _insert_unreliable) { - if((_insert_reliable && !_insert_unreliable && !_subscription->get_endpoint(true)) || - (_insert_unreliable && !_insert_reliable && !_subscription->get_endpoint(false))) { - // don't create an eventgroup entry if there isn't an endpoint option - // to insert - return; - } - std::shared_ptr < eventgroupentry_impl > its_entry = - _message->create_eventgroup_entry(); - its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_entry->set_service(_service); - its_entry->set_instance(_instance); - its_entry->set_eventgroup(_eventgroup); - its_entry->set_counter(_subscription->get_counter()); - its_entry->set_major_version(_subscription->get_major()); - its_entry->set_ttl(_subscription->get_ttl()); - std::shared_ptr < endpoint > its_endpoint; - if (_insert_reliable) { - its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); - if (its_port) { - insert_option(_message, its_entry, unicast_, its_port, true); - } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - "local reliable port is zero: [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + remote_offer_type_e _offer_type) { + bool ret(false); + std::shared_ptr its_reliable_endpoint(_subscription->get_endpoint(true)); + std::shared_ptr its_unreliable_endpoint(_subscription->get_endpoint(false)); + + bool insert_reliable(false); + bool insert_unreliable(false); + switch (_offer_type) { + case remote_offer_type_e::RELIABLE: + if (its_reliable_endpoint) { + insert_reliable = true; } - } - } - if (_insert_unreliable) { - its_endpoint = _subscription->get_endpoint(false); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); - if (its_port) { - insert_option(_message, its_entry, unicast_, its_port, false); - } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - " local unreliable port is zero: [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + break; + case remote_offer_type_e::UNRELIABLE: + if (its_unreliable_endpoint) { + insert_unreliable = true; } - } + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + if (its_reliable_endpoint && its_unreliable_endpoint) { + insert_reliable = true; + insert_unreliable = true; + } + break; + default: + break; } -} - -void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr &_message, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr &_subscription) { - // SIP_SD_844: - // This method is used for not acknowledged subscriptions on renew subscription - // Two entries: Stop subscribe & subscribe within one SD-Message - // One option: Both entries reference it - - std::shared_ptr < eventgroupentry_impl > its_stop_entry = - _message->create_eventgroup_entry(); - its_stop_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_stop_entry->set_service(_service); - its_stop_entry->set_instance(_instance); - its_stop_entry->set_eventgroup(_eventgroup); - its_stop_entry->set_counter(_subscription->get_counter()); - its_stop_entry->set_major_version(_subscription->get_major()); - its_stop_entry->set_ttl(0); - - std::shared_ptr < eventgroupentry_impl > its_entry = - _message->create_eventgroup_entry(); - its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_entry->set_service(_service); - its_entry->set_instance(_instance); - its_entry->set_eventgroup(_eventgroup); - its_entry->set_counter(_subscription->get_counter()); - its_entry->set_major_version(_subscription->get_major()); - its_entry->set_ttl(_subscription->get_ttl()); - - std::shared_ptr < endpoint > its_endpoint; - its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint && its_endpoint->is_connected()) { - const std::uint16_t its_port = its_endpoint->get_local_port(); + if (!insert_reliable && !insert_unreliable) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "subscription doesn't match offer type: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " + << _offer_type; + return false; + } + std::shared_ptr its_entry; + if (insert_reliable && its_reliable_endpoint) { + const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); if (its_port) { - insert_option(_message, its_stop_entry, unicast_, its_port, true); + its_entry = _message->create_eventgroup_entry(); + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_subscription->get_ttl()); insert_option(_message, its_entry, unicast_, its_port, true); + ret = true; } else { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " "local reliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + ret = false; } } - its_endpoint = _subscription->get_endpoint(false); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); + if (insert_unreliable && its_unreliable_endpoint) { + const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); if (its_port) { - insert_option(_message, its_stop_entry, unicast_, its_port, false); + if (!its_entry) { + its_entry = _message->create_eventgroup_entry(); + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_subscription->get_ttl()); + } insert_option(_message, its_entry, unicast_, its_port, false); + ret = true; } else { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - "local unreliable port is zero: [" + " local unreliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + ret = false; } } + return ret; +} + +bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr &_message, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr &_subscription, remote_offer_type_e _offer_type) { + bool ret(false); + // SIP_SD_844: + // This method is used for not acknowledged subscriptions on renew subscription + // Two entries: Stop subscribe & subscribe within one SD-Message + // One option: Both entries reference it + + const std::function(ttl_t)> insert_entry + = [&](ttl_t _ttl) { + std::shared_ptr its_entry = + _message->create_eventgroup_entry(); + // SUBSCRIBE_EVENTGROUP and STOP_SUBSCRIBE_EVENTGROUP are identical + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_ttl); + return its_entry; + }; + + std::shared_ptr its_reliable_endpoint(_subscription->get_endpoint(true)); + std::shared_ptr its_unreliable_endpoint(_subscription->get_endpoint(false)); + + if (_offer_type == remote_offer_type_e::UNRELIABLE && + !its_reliable_endpoint && its_unreliable_endpoint) { + if (its_unreliable_endpoint->is_connected()) { + const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); + if (its_port) { + std::shared_ptr its_stop_entry = insert_entry(0); + std::shared_ptr its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_port, false); + insert_option(_message, its_entry, unicast_, its_port, false); + ret = true; + } else { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local unreliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + _subscription->set_udp_connection_established(false); + } + } else if (_offer_type == remote_offer_type_e::RELIABLE && + its_reliable_endpoint && !its_unreliable_endpoint) { + if (its_reliable_endpoint->is_connected()) { + const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); + if (its_port) { + std::shared_ptr its_stop_entry = insert_entry(0); + std::shared_ptr its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_port, true); + insert_option(_message, its_entry, unicast_, its_port, true); + ret = true; + } else { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local reliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + _subscription->set_tcp_connection_established(false); + } + } else if (_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && + its_reliable_endpoint && its_unreliable_endpoint) { + if (its_reliable_endpoint->is_connected() && + its_unreliable_endpoint->is_connected()) { + const std::uint16_t its_reliable_port = its_reliable_endpoint->get_local_port(); + const std::uint16_t its_unreliable_port = its_unreliable_endpoint->get_local_port(); + if (its_reliable_port && its_unreliable_port) { + std::shared_ptr its_stop_entry = insert_entry(0); + std::shared_ptr its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_reliable_port, true); + insert_option(_message, its_entry, unicast_, its_reliable_port, true); + insert_option(_message, its_stop_entry, unicast_, its_unreliable_port, false); + insert_option(_message, its_entry, unicast_, its_unreliable_port, false); + ret = true; + } else if (!its_reliable_port) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local reliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } else if (!its_unreliable_port) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local unreliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + if (!its_reliable_endpoint->is_connected()) { + _subscription->set_tcp_connection_established(false); + } + if (!its_unreliable_endpoint->is_connected()) { + _subscription->set_udp_connection_established(false); + } + } + } else { + VSOMEIP_WARNING << __func__ << ": Couldn't insert StopSubscribe/Subscribe [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " + << _offer_type; + } + return ret; } void service_discovery_impl::insert_subscription_ack( @@ -1151,6 +1173,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, if (is_reboot(_sender, _destination, its_message->get_reboot_flag(), its_message->get_session())) { VSOMEIP_INFO << "Reboot detected: IP=" << _sender.to_string(); + remove_remote_offer_type_by_ip(_sender); host_->expire_subscriptions(_sender); host_->expire_services(_sender); } @@ -1341,6 +1364,9 @@ void service_discovery_impl::process_serviceentry( // ID: SIP_SD_830 its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } + remove_remote_offer_type(its_service, its_instance, + (its_reliable_port != ILLEGAL_PORT ? + its_reliable_address : its_unreliable_address)); unsubscribe_all(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, @@ -1367,6 +1393,31 @@ void service_discovery_impl::process_offerservice_serviceentry( std::lock_guard its_lock(requested_mutex_); its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } + remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN); + if (_reliable_port != ILLEGAL_PORT + && _unreliable_port != ILLEGAL_PORT + && !_reliable_address.is_unspecified() + && !_unreliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::RELIABLE_UNRELIABLE; + } else if (_unreliable_port != ILLEGAL_PORT + && !_unreliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::UNRELIABLE; + } else if (_reliable_port != ILLEGAL_PORT + && !_reliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::RELIABLE; + } else { + VSOMEIP_WARNING << __func__ << ": unknown remote offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; + } + + if (update_remote_offer_type(_service,_instance, offer_type, + _reliable_address, _unreliable_address)) { + VSOMEIP_WARNING << __func__ << ": Remote offer type changed [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; + } + host_->add_routing_info(_service, _instance, _major, _minor, @@ -1392,6 +1443,8 @@ void service_discovery_impl::process_offerservice_serviceentry( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for (auto its_eventgroup : found_instance->second) { for (auto its_client : its_eventgroup.second) { std::shared_ptr its_subscription(its_client.second); @@ -1400,7 +1453,6 @@ void service_discovery_impl::process_offerservice_serviceentry( bool has_address(false); boost::asio::ip::address its_address; get_subscription_endpoints( - its_client.second->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, its_client.first); @@ -1420,66 +1472,113 @@ void service_discovery_impl::process_offerservice_serviceentry( } if (its_subscription->is_acknowledged()) { - if (its_subscription->get_endpoint(true) - && its_subscription->get_endpoint(true)->is_connected()) { - // 40 = 16 (subscription) + 2x12 (option) - check_space(40); - const std::size_t options_size_before = - _resubscribes->back().second->get_options().size(); - insert_subscription(_resubscribes->back().second, - _service, _instance, - its_eventgroup.first, - its_subscription, true, true); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast( - _resubscribes->back().first + (12u * diff - 24u)); - } else { - // don't insert reliable endpoint option if the - // TCP client endpoint is not yet connected + if (its_offer_type == remote_offer_type_e::UNRELIABLE && + its_unreliable && its_unreliable->is_connected()) { // 28 = 16 (subscription) + 12 (option) check_space(28); const std::size_t options_size_before = _resubscribes->back().second->get_options().size(); - insert_subscription(_resubscribes->back().second, + if (insert_subscription(_resubscribes->back().second, _service, _instance, its_eventgroup.first, - its_subscription, false, true); - its_client.second->set_tcp_connection_established(false); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast( - _resubscribes->back().first + (12u * diff - 12u)); - // restart TCP endpoint if not connected - if (its_subscription->get_endpoint(true) - && !its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->get_endpoint(true)->restart(); + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast( + _resubscribes->back().first + (12u * diff - 12u)); + } else { + _resubscribes->back().first = + static_cast( + _resubscribes->back().first - 28); + } + } else if (its_offer_type == remote_offer_type_e::RELIABLE && + its_reliable) { + if (its_reliable->is_connected()) { + // 28 = 16 (subscription) + 12 (option) + check_space(28); + const std::size_t options_size_before = + _resubscribes->back().second->get_options().size(); + if (insert_subscription(_resubscribes->back().second, + _service, _instance, + its_eventgroup.first, + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast( + _resubscribes->back().first + (12u * diff - 12u)); + } else { + _resubscribes->back().first = + static_cast( + _resubscribes->back().first - 28); + } + } else { + its_client.second->set_tcp_connection_established(false); + // restart TCP endpoint if not connected + its_reliable->restart(); + } + } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE) { + if (its_reliable && its_unreliable && + its_reliable->is_connected() && + its_unreliable->is_connected()) { + // 40 = 16 (subscription) + 2x12 (option) + check_space(40); + const std::size_t options_size_before = + _resubscribes->back().second->get_options().size(); + if (insert_subscription(_resubscribes->back().second, + _service, _instance, + its_eventgroup.first, + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast( + _resubscribes->back().first + (12u * diff - 24u)); + } else { + _resubscribes->back().first = + static_cast( + _resubscribes->back().first - 40); + } + } else if (its_reliable && !its_reliable->is_connected()) { + its_client.second->set_tcp_connection_established(false); + // restart TCP endpoint if not connected + its_reliable->restart(); } + } else { + VSOMEIP_WARNING << __func__ << ": unknown remote offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; } - its_subscription->set_acknowledged(false); } else { // 56 = 2x16 (subscription) + 2x12 (option) check_space(56); const std::size_t options_size_before = _resubscribes->back().second->get_options().size(); - insert_nack_subscription_on_resubscribe(_resubscribes->back().second, + if (insert_nack_subscription_on_resubscribe(_resubscribes->back().second, _service, _instance, its_eventgroup.first, - its_subscription); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast( - _resubscribes->back().first + (12u * diff - 24u)); + its_subscription, its_offer_type) ) { + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast( + _resubscribes->back().first + (12u * diff - 24u)); + } else { + _resubscribes->back().first = + static_cast( + _resubscribes->back().first - 56u); + } // restart TCP endpoint if not connected - if (its_subscription->get_endpoint(true) - && !its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->get_endpoint(true)->restart(); + if (its_reliable && !its_reliable->is_connected()) { + its_reliable->restart(); } } } @@ -1592,7 +1691,8 @@ void service_discovery_impl::on_endpoint_connected( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if(0 < found_instance->second.size()) { - + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for(const auto &its_eventgroup : found_instance->second) { for(const auto &its_client : its_eventgroup.second) { if (its_client.first != VSOMEIP_ROUTING_CLIENT) { @@ -1637,7 +1737,6 @@ void service_discovery_impl::on_endpoint_connected( std::shared_ptr its_unreliable; std::shared_ptr its_reliable; get_subscription_endpoints( - its_subscription->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, its_client.first); @@ -1645,7 +1744,7 @@ void service_discovery_impl::on_endpoint_connected( its_subscription->set_endpoint(its_unreliable, false); insert_subscription(its_message, _service, _instance, its_eventgroup.first, - its_subscription, true, true); + its_subscription, its_offer_type); its_subscription->set_acknowledged(false); } } @@ -2482,6 +2581,8 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for (auto found_eventgroup : found_instance->second) { auto found_client = found_eventgroup.second.find(_client); if (found_client != found_eventgroup.second.end()) { @@ -2490,7 +2591,6 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ bool has_address(false); boost::asio::ip::address its_address; get_subscription_endpoints( - found_client->second->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, found_client->first); @@ -2526,7 +2626,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (its_reliable->is_connected() && its_unreliable->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, true, true); + found_client->second, its_offer_type); found_client->second->set_tcp_connection_established(true); found_client->second->set_udp_connection_established(true); } else { @@ -2538,7 +2638,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if(endpoint->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, _reliable, !_reliable); + found_client->second, its_offer_type); found_client->second->set_tcp_connection_established(true); } else { // don't insert reliable endpoint option if the @@ -2549,7 +2649,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (endpoint->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, _reliable, !_reliable); + found_client->second, its_offer_type); found_client->second->set_udp_connection_established(true); } else { // don't insert unreliable endpoint option if the @@ -3517,5 +3617,78 @@ void service_discovery_impl::stop_last_msg_received_timer() { last_msg_received_timer_.cancel(ec); } +service_discovery_impl::remote_offer_type_e service_discovery_impl::get_remote_offer_type( + service_t _service, instance_t _instance) { + std::lock_guard its_lock(remote_offer_types_mutex_); + auto found_si = remote_offer_types_.find(std::make_pair(_service, _instance)); + if (found_si != remote_offer_types_.end()) { + return found_si->second; + } + return remote_offer_type_e::UNKNOWN; +} + +bool service_discovery_impl::update_remote_offer_type( + service_t _service, instance_t _instance, + remote_offer_type_e _offer_type, + const boost::asio::ip::address &_reliable_address, + const boost::asio::ip::address &_unreliable_address) { + bool ret(false); + std::lock_guard its_lock(remote_offer_types_mutex_); + const std::pair its_si_pair = std::make_pair(_service, _instance); + auto found_si = remote_offer_types_.find(its_si_pair); + if (found_si != remote_offer_types_.end()) { + if (found_si->second != _offer_type ) { + found_si->second = _offer_type; + ret = true; + } + } else { + remote_offer_types_[its_si_pair] = _offer_type; + } + switch (_offer_type) { + case remote_offer_type_e::UNRELIABLE: + remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::RELIABLE: + remote_offers_by_ip_[_reliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::UNKNOWN: + default: + VSOMEIP_WARNING << __func__ << ": unkown offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]" + << _offer_type; + break; + } + return ret; +} + +void service_discovery_impl::remove_remote_offer_type( + service_t _service, instance_t _instance, + const boost::asio::ip::address &_address) { + std::lock_guard its_lock(remote_offer_types_mutex_); + const std::pair its_si_pair = + std::make_pair(_service, _instance); + remote_offer_types_.erase(its_si_pair); + auto found_services = remote_offers_by_ip_.find(_address); + if (found_services != remote_offers_by_ip_.end()) { + found_services->second.erase(its_si_pair); + } +} + +void service_discovery_impl::remove_remote_offer_type_by_ip( + const boost::asio::ip::address &_address) { + std::lock_guard its_lock(remote_offer_types_mutex_); + auto found_services = remote_offers_by_ip_.find(_address); + if (found_services != remote_offers_by_ip_.end()) { + for (const auto& si : found_services->second) { + remote_offer_types_.erase(si); + } + } + remote_offers_by_ip_.erase(_address); +} + } // namespace sd } // namespace vsomeip diff --git a/test/subscribe_notify_one_tests/subscribe_notify_one_test_service.cpp b/test/subscribe_notify_one_tests/subscribe_notify_one_test_service.cpp index 9c96f733f..526664891 100644 --- a/test/subscribe_notify_one_tests/subscribe_notify_one_test_service.cpp +++ b/test/subscribe_notify_one_tests/subscribe_notify_one_test_service.cpp @@ -236,10 +236,6 @@ class subscribe_notify_one_test_service { case vsomeip::subscription_type_e::SU_RELIABLE: case vsomeip::subscription_type_e::SU_PREFER_UNRELIABLE: case vsomeip::subscription_type_e::SU_PREFER_RELIABLE: - if (all_notifications_received()) { - notify = true; - } - break; case vsomeip::subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: if (all_notifications_received_tcp_and_udp()) { notify = true; diff --git a/test/subscribe_notify_tests/subscribe_notify_test_service.cpp b/test/subscribe_notify_tests/subscribe_notify_test_service.cpp index 343976ec5..968d9edac 100644 --- a/test/subscribe_notify_tests/subscribe_notify_test_service.cpp +++ b/test/subscribe_notify_tests/subscribe_notify_test_service.cpp @@ -235,10 +235,6 @@ class subscribe_notify_test_service { case vsomeip::subscription_type_e::SU_RELIABLE: case vsomeip::subscription_type_e::SU_PREFER_UNRELIABLE: case vsomeip::subscription_type_e::SU_PREFER_RELIABLE: - if (all_notifications_received()) { - notify = true; - } - break; case vsomeip::subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: if (all_notifications_received()) { notify = true;