diff --git a/include/fc/network/udp_socket.hpp b/include/fc/network/udp_socket.hpp index d6354c29e..a05986f2d 100644 --- a/include/fc/network/udp_socket.hpp +++ b/include/fc/network/udp_socket.hpp @@ -22,8 +22,8 @@ namespace fc { void bind( const fc::ip::endpoint& ); size_t receive_from( char* b, size_t l, fc::ip::endpoint& from ); size_t receive_from( const std::shared_ptr& b, size_t l, fc::ip::endpoint& from ); - size_t send_to( const char* b, size_t l, const fc::ip::endpoint& to ); - size_t send_to( const std::shared_ptr& b, size_t l, const fc::ip::endpoint& to ); + void send_to( const char* b, size_t l, const fc::ip::endpoint& to ); + void send_to( const std::shared_ptr& b, size_t l, const fc::ip::endpoint& to ); void close(); void set_multicast_enable_loopback( bool ); diff --git a/src/log/console_appender.cpp b/src/log/console_appender.cpp index 41a997225..ec16d4a3b 100644 --- a/src/log/console_appender.cpp +++ b/src/log/console_appender.cpp @@ -21,6 +21,7 @@ namespace fc { class console_appender::impl { public: config cfg; + boost::mutex log_mutex; color::type lc[log_level::off+1]; #ifdef WIN32 HANDLE console_handle; @@ -83,10 +84,6 @@ namespace fc { } } - boost::mutex& log_mutex() { - static boost::mutex m; return m; - } - void console_appender::log( const log_message& m ) { FILE* out = stream::std_error ? stderr : stdout; @@ -116,7 +113,7 @@ namespace fc { std::string message = fc::format_string( m.get_format(), m.get_data(), my->cfg.max_object_depth ); line << message; - fc::unique_lock lock(log_mutex()); + std::unique_lock lock(my->log_mutex); print( line.str(), my->lc[m.get_context().get_log_level()] ); diff --git a/src/log/gelf_appender.cpp b/src/log/gelf_appender.cpp index c1828e5f2..0367bfa11 100644 --- a/src/log/gelf_appender.cpp +++ b/src/log/gelf_appender.cpp @@ -11,8 +11,12 @@ #include #include +#include + +#include #include #include +#include #include #include #include @@ -26,9 +30,11 @@ namespace fc config cfg; optional gelf_endpoint; udp_socket gelf_socket; + std::atomic gelf_log_counter; + boost::mutex gelf_log_mutex; impl(const config& c) : - cfg(c) + cfg(c), gelf_log_counter(0) { } @@ -96,7 +102,11 @@ namespace fc gelf_message["host"] = my->cfg.host; gelf_message["short_message"] = format_string( message.get_format(), message.get_data(), my->cfg.max_object_depth ); - gelf_message["timestamp"] = context.get_timestamp().time_since_epoch().count() / 1000000.; + const auto time_ns = context.get_timestamp().time_since_epoch().count(); + gelf_message["timestamp"] = time_ns / 1000000.; + gelf_message["_timestamp_ns"] = time_ns; + + gelf_message["_log_id"] = fc::to_string(++my->gelf_log_counter); switch (context.get_log_level()) { @@ -131,7 +141,7 @@ namespace fc string gelf_message_as_string; try { - gelf_message_as_string = json::to_string(gelf_message); + gelf_message_as_string = json::to_string(gelf_message, json::legacy_generator); } catch( const fc::assert_exception& e ) { @@ -149,6 +159,8 @@ namespace fc gelf_message_as_string[1] = (char)0x9c; assert(gelf_message_as_string[1] == (char)0x9c); + std::unique_lock lock(my->gelf_log_mutex); + // packets are sent by UDP, and they tend to disappear if they // get too large. It's hard to find any solid numbers on how // large they can be before they get dropped -- datagrams can diff --git a/src/network/udp_socket.cpp b/src/network/udp_socket.cpp index 66eb96db8..b896adb59 100644 --- a/src/network/udp_socket.cpp +++ b/src/network/udp_socket.cpp @@ -44,43 +44,51 @@ namespace fc { } } - size_t udp_socket::send_to( const char* buffer, size_t length, const ip::endpoint& to ) + void udp_socket::send_to( const char* buffer, size_t length, const ip::endpoint& to ) { try { - return my->_sock.send_to( boost::asio::buffer(buffer, length), to_asio_ep(to) ); + my->_sock.send_to( boost::asio::buffer(buffer, length), to_asio_ep(to) ); } catch( const boost::system::system_error& e ) { - if( e.code() != boost::asio::error::would_block ) - throw; + if(e.code() == boost::asio::error::would_block) + { + auto send_buffer_ptr = std::make_shared>(buffer, buffer+length); + my->_sock.async_send_to(boost::asio::buffer(send_buffer_ptr.get(), length), to_asio_ep(to), + [send_buffer_ptr]( const boost::system::error_code& /*ec*/, + std::size_t /*bytes_transferred*/ ) + { + // Swallow errors. Currently only used for GELF logging, so depend on local + // log to catch anything that doesn't make it across the network. + }); + } + // All other exceptions ignored. } - - promise::ptr completion_promise = promise::create("udp_socket::send_to"); - my->_sock.async_send_to( boost::asio::buffer(buffer, length), to_asio_ep(to), - asio::detail::read_write_handler(completion_promise) ); - - return completion_promise->wait(); } - size_t udp_socket::send_to( const std::shared_ptr& buffer, size_t length, + void udp_socket::send_to( const std::shared_ptr& buffer, size_t length, const fc::ip::endpoint& to ) { try { - return my->_sock.send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to) ); + my->_sock.send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to) ); } catch( const boost::system::system_error& e ) { - if( e.code() != boost::asio::error::would_block ) - throw; + if(e.code() == boost::asio::error::would_block) + { + auto preserved_buffer_ptr = buffer; + my->_sock.async_send_to(boost::asio::buffer(preserved_buffer_ptr.get(), length), to_asio_ep(to), + [preserved_buffer_ptr](const boost::system::error_code& /*ec*/, + std::size_t /*bytes_transferred*/) + { + // Swallow errors. Currently only used for GELF logging, so depend on local + // log to catch anything that doesn't make it across the network. + }); + } + // All other exceptions ignored. } - - promise::ptr completion_promise = promise::create("udp_socket::send_to"); - my->_sock.async_send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to), - asio::detail::read_write_handler_with_buffer(completion_promise, buffer) ); - - return completion_promise->wait(); } void udp_socket::open() {