Skip to content

Commit

Permalink
fix client pool (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jul 22, 2024
1 parent eb4becc commit c197a18
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 96 deletions.
2 changes: 1 addition & 1 deletion include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool enable_follow_redirect_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
std::chrono::seconds(30);
std::chrono::steady_clock::duration req_timeout_duration_ =
std::chrono::seconds(60);
bool enable_tcp_no_delay_ = true;
Expand Down
9 changes: 8 additions & 1 deletion include/cinatra/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class channel {
struct RandomLoadBlancer {
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
static thread_local std::default_random_engine e;
static thread_local std::default_random_engine e(std::time(nullptr));
std::uniform_int_distribution rnd{std::size_t{0},
channel.client_pools_.size() - 1};
co_return channel.client_pools_[rnd(e)];
Expand Down Expand Up @@ -192,6 +192,13 @@ class channel {
return ch;
}

/**
* @brief return the channel's hosts size.
*
* @return std::size_t
*/
std::size_t size() const noexcept { return client_pools_.size(); }

private:
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, const std::vector<int>& weights,
Expand Down
99 changes: 5 additions & 94 deletions include/cinatra/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class client_pool : public std::enable_shared_from_this<
do {
CINATRA_LOG_TRACE << "try to reconnect client{" << client.get()
<< "},host:{" << client->get_host() << ":"
<< client->get_port() << "}, try count:" << i
<< client->get_port() << "}, try count:" << i + 1
<< "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -151,32 +151,6 @@ class client_pool : public std::enable_shared_from_this<
client = nullptr;
}

struct promise_handler {
std::atomic<bool> flag_ = false;
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;
Expand All @@ -193,47 +167,7 @@ class client_pool : public std::enable_shared_from_this<
CINATRA_LOG_ERROR << "init client config failed.";
co_return nullptr;
}
auto client_ptr = client.get();
auto handler = std::make_shared<promise_handler>();
connect_client(std::move(client), this->weak_from_this(), handler)
.start([](auto&&) {
});
auto timer = std::make_shared<coro_io::period_timer>(
executor->get_asio_executor());
timer->expires_after(std::chrono::milliseconds{20});
timer->async_await().start([watcher = this->weak_from_this(), handler,
client_ptr, timer](auto&& res) {
if (res.value() && !handler->flag_) {
if (auto self = watcher.lock(); self) {
++self->promise_cnt_;
self->promise_queue_.enqueue(handler);
timer->expires_after(
(std::max)(std::chrono::milliseconds{0},
self->pool_config_.max_connection_time -
std::chrono::milliseconds{20}));
timer->async_await().start([handler = std::move(handler),
client_ptr = client_ptr](auto&& res) {
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
CINATRA_LOG_ERROR << "Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. ";
handler->promise_.setValue(std::unique_ptr<client_t>{nullptr});
}
});
}
}
});
CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_
<< "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
std::error_code ignore_ec;
timer->cancel(ignore_ec);
});
}
co_await reconnect(client, this->weak_from_this());
}
else {
CINATRA_LOG_TRACE << "get free client{" << client.get()
Expand Down Expand Up @@ -264,30 +198,10 @@ class client_pool : public std::enable_shared_from_this<

void collect_free_client(std::unique_ptr<client_t> client) {
if (!client->has_closed()) {
std::shared_ptr<promise_handler> handler;
if (promise_cnt_) {
int cnt = 0;
while (promise_queue_.try_dequeue(handler)) {
++cnt;
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_
<< "}";
return;
}
}
promise_cnt_ -= cnt;
}

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
else {
CINATRA_LOG_TRACE << "out of max connection limit <<"
Expand Down Expand Up @@ -334,7 +248,6 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
std::chrono::milliseconds max_connection_time{60000};
typename client_t::config client_config;
};

Expand Down Expand Up @@ -454,8 +367,6 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
std::atomic<int> promise_cnt_ = 0;
moodycamel::ConcurrentQueue<std::shared_ptr<promise_handler>> promise_queue_;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down

0 comments on commit c197a18

Please sign in to comment.