Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix client pool #616

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool enable_follow_redirect_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
std::chrono::seconds(30);
std::chrono::steady_clock::duration req_timeout_duration_ =
std::chrono::seconds(60);
bool enable_tcp_no_delay_ = true;
Expand Down
9 changes: 8 additions & 1 deletion include/cinatra/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class channel {
struct RandomLoadBlancer {
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
static thread_local std::default_random_engine e;
static thread_local std::default_random_engine e(std::time(nullptr));
std::uniform_int_distribution rnd{std::size_t{0},
channel.client_pools_.size() - 1};
co_return channel.client_pools_[rnd(e)];
Expand Down Expand Up @@ -192,6 +192,13 @@ class channel {
return ch;
}

/**
* @brief return the channel's hosts size.
*
* @return std::size_t
*/
std::size_t size() const noexcept { return client_pools_.size(); }

private:
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, const std::vector<int>& weights,
Expand Down
99 changes: 5 additions & 94 deletions include/cinatra/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class client_pool : public std::enable_shared_from_this<
do {
CINATRA_LOG_TRACE << "try to reconnect client{" << client.get()
<< "},host:{" << client->get_host() << ":"
<< client->get_port() << "}, try count:" << i
<< client->get_port() << "}, try count:" << i + 1
<< "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -151,32 +151,6 @@ class client_pool : public std::enable_shared_from_this<
client = nullptr;
}

struct promise_handler {
std::atomic<bool> flag_ = false;
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;
Expand All @@ -193,47 +167,7 @@ class client_pool : public std::enable_shared_from_this<
CINATRA_LOG_ERROR << "init client config failed.";
co_return nullptr;
}
auto client_ptr = client.get();
auto handler = std::make_shared<promise_handler>();
connect_client(std::move(client), this->weak_from_this(), handler)
.start([](auto&&) {
});
auto timer = std::make_shared<coro_io::period_timer>(
executor->get_asio_executor());
timer->expires_after(std::chrono::milliseconds{20});
timer->async_await().start([watcher = this->weak_from_this(), handler,
client_ptr, timer](auto&& res) {
if (res.value() && !handler->flag_) {
if (auto self = watcher.lock(); self) {
++self->promise_cnt_;
self->promise_queue_.enqueue(handler);
timer->expires_after(
(std::max)(std::chrono::milliseconds{0},
self->pool_config_.max_connection_time -
std::chrono::milliseconds{20}));
timer->async_await().start([handler = std::move(handler),
client_ptr = client_ptr](auto&& res) {
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
CINATRA_LOG_ERROR << "Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. ";
handler->promise_.setValue(std::unique_ptr<client_t>{nullptr});
}
});
}
}
});
CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_
<< "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
std::error_code ignore_ec;
timer->cancel(ignore_ec);
});
}
co_await reconnect(client, this->weak_from_this());
}
else {
CINATRA_LOG_TRACE << "get free client{" << client.get()
Expand Down Expand Up @@ -264,30 +198,10 @@ class client_pool : public std::enable_shared_from_this<

void collect_free_client(std::unique_ptr<client_t> client) {
if (!client->has_closed()) {
std::shared_ptr<promise_handler> handler;
if (promise_cnt_) {
int cnt = 0;
while (promise_queue_.try_dequeue(handler)) {
++cnt;
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_
<< "}";
return;
}
}
promise_cnt_ -= cnt;
}

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
else {
CINATRA_LOG_TRACE << "out of max connection limit <<"
Expand Down Expand Up @@ -334,7 +248,6 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
std::chrono::milliseconds max_connection_time{60000};
typename client_t::config client_config;
};

Expand Down Expand Up @@ -454,8 +367,6 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
std::atomic<int> promise_cnt_ = 0;
moodycamel::ConcurrentQueue<std::shared_ptr<promise_handler>> promise_queue_;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down
Loading