diff --git a/include/cinatra/coro_http_client.hpp b/include/cinatra/coro_http_client.hpp index 7473c7de..063ccb68 100644 --- a/include/cinatra/coro_http_client.hpp +++ b/include/cinatra/coro_http_client.hpp @@ -2424,7 +2424,7 @@ class coro_http_client : public std::enable_shared_from_this { bool enable_follow_redirect_ = false; bool enable_timeout_ = false; std::chrono::steady_clock::duration conn_timeout_duration_ = - std::chrono::seconds(8); + std::chrono::seconds(30); std::chrono::steady_clock::duration req_timeout_duration_ = std::chrono::seconds(60); bool enable_tcp_no_delay_ = true; diff --git a/include/cinatra/ylt/coro_io/channel.hpp b/include/cinatra/ylt/coro_io/channel.hpp index bdaec570..a7962c97 100644 --- a/include/cinatra/ylt/coro_io/channel.hpp +++ b/include/cinatra/ylt/coro_io/channel.hpp @@ -139,7 +139,7 @@ class channel { struct RandomLoadBlancer { async_simple::coro::Lazy> operator()( const channel& channel) { - static thread_local std::default_random_engine e; + static thread_local std::default_random_engine e(std::time(nullptr)); std::uniform_int_distribution rnd{std::size_t{0}, channel.client_pools_.size() - 1}; co_return channel.client_pools_[rnd(e)]; @@ -192,6 +192,13 @@ class channel { return ch; } + /** + * @brief return the channel's hosts size. + * + * @return std::size_t + */ + std::size_t size() const noexcept { return client_pools_.size(); } + private: void init(const std::vector& hosts, const channel_config& config, const std::vector& weights, diff --git a/include/cinatra/ylt/coro_io/client_pool.hpp b/include/cinatra/ylt/coro_io/client_pool.hpp index f51fca9a..89af0393 100644 --- a/include/cinatra/ylt/coro_io/client_pool.hpp +++ b/include/cinatra/ylt/coro_io/client_pool.hpp @@ -120,7 +120,7 @@ class client_pool : public std::enable_shared_from_this< do { CINATRA_LOG_TRACE << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" - << client->get_port() << "}, try count:" << i + << client->get_port() << "}, try count:" << i + 1 << "max retry limit:" << self->pool_config_.connect_retry_count; auto pre_time_point = std::chrono::steady_clock::now(); @@ -151,32 +151,6 @@ class client_pool : public std::enable_shared_from_this< client = nullptr; } - struct promise_handler { - std::atomic flag_ = false; - async_simple::Promise> promise_; - }; - - static async_simple::coro::Lazy connect_client( - std::unique_ptr client, std::weak_ptr watcher, - std::shared_ptr handler) { - co_await reconnect(client, watcher); - auto has_get_connect = handler->flag_.exchange(true); - if (!has_get_connect) { - handler->promise_.setValue(std::move(client)); - } - else { - if (client) { - auto self = watcher.lock(); - auto conn_lim = - std::min(10u, self->pool_config_.max_connection); - if (self && self->free_clients_.size() < conn_lim) { - self->enqueue(self->free_clients_, std::move(client), - self->pool_config_.idle_timeout); - } - } - } - } - async_simple::coro::Lazy> get_client( const typename client_t::config& client_config) { std::unique_ptr client; @@ -193,47 +167,7 @@ class client_pool : public std::enable_shared_from_this< CINATRA_LOG_ERROR << "init client config failed."; co_return nullptr; } - auto client_ptr = client.get(); - auto handler = std::make_shared(); - connect_client(std::move(client), this->weak_from_this(), handler) - .start([](auto&&) { - }); - auto timer = std::make_shared( - executor->get_asio_executor()); - timer->expires_after(std::chrono::milliseconds{20}); - timer->async_await().start([watcher = this->weak_from_this(), handler, - client_ptr, timer](auto&& res) { - if (res.value() && !handler->flag_) { - if (auto self = watcher.lock(); self) { - ++self->promise_cnt_; - self->promise_queue_.enqueue(handler); - timer->expires_after( - (std::max)(std::chrono::milliseconds{0}, - self->pool_config_.max_connection_time - - std::chrono::milliseconds{20})); - timer->async_await().start([handler = std::move(handler), - client_ptr = client_ptr](auto&& res) { - auto has_get_connect = handler->flag_.exchange(true); - if (!has_get_connect) { - CINATRA_LOG_ERROR << "Out of max limitation of connect " - "time, connect " - "failed. skip wait client{" - << client_ptr << "} connect. "; - handler->promise_.setValue(std::unique_ptr{nullptr}); - } - }); - } - } - }); - CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_ - << "}"; - client = co_await handler->promise_.getFuture(); - if (client) { - executor->schedule([timer] { - std::error_code ignore_ec; - timer->cancel(ignore_ec); - }); - } + co_await reconnect(client, this->weak_from_this()); } else { CINATRA_LOG_TRACE << "get free client{" << client.get() @@ -264,30 +198,10 @@ class client_pool : public std::enable_shared_from_this< void collect_free_client(std::unique_ptr client) { if (!client->has_closed()) { - std::shared_ptr handler; - if (promise_cnt_) { - int cnt = 0; - while (promise_queue_.try_dequeue(handler)) { - ++cnt; - auto has_get_connect = handler->flag_.exchange(true); - if (!has_get_connect) { - handler->promise_.setValue(std::move(client)); - promise_cnt_ -= cnt; - CINATRA_LOG_TRACE << "collect free client{" << client.get() - << "} and wake up promise{" << &handler->promise_ - << "}"; - return; - } - } - promise_cnt_ -= cnt; - } - if (free_clients_.size() < pool_config_.max_connection) { - if (client) { - CINATRA_LOG_TRACE << "collect free client{" << client.get() - << "} enqueue"; - enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); - } + CINATRA_LOG_TRACE << "collect free client{" << client.get() + << "} enqueue"; + enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); } else { CINATRA_LOG_TRACE << "out of max connection limit <<" @@ -334,7 +248,6 @@ class client_pool : public std::enable_shared_from_this< std::chrono::milliseconds reconnect_wait_time{1000}; std::chrono::milliseconds idle_timeout{30000}; std::chrono::milliseconds short_connect_idle_timeout{1000}; - std::chrono::milliseconds max_connection_time{60000}; typename client_t::config client_config; }; @@ -454,8 +367,6 @@ class client_pool : public std::enable_shared_from_this< coro_io::detail::client_queue> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; - std::atomic promise_cnt_ = 0; - moodycamel::ConcurrentQueue> promise_queue_; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_;