Skip to content

Commit

Permalink
connection_pool now resizes correctly in the presence of pending conn…
Browse files Browse the repository at this point in the history
…ections

Changed the algorithm that calculates how many connections to create to
correctly take into account the number of pending requests.
This makes the pool resize as expected and avoids potential deadlocks.

close #395
  • Loading branch information
anarthal authored Jan 6, 2025
1 parent 25f9900 commit 1c67537
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ struct conn_shared_state
asio::basic_waitable_timer<ClockType> idle_connections_cv;

// The number of pending connections (currently getting ready).
// Controls that we don't create connections while some are still connecting
// Required to compute how many connections we should create at any given point in time.
std::size_t num_pending_connections{0};

// The number of async_get_connection ops that are waiting for a connection to become available.
// Required to compute how many connections we should create at any given point in time.
std::size_t num_pending_requests{0};

// Info about the last connection attempt. Already processed, suitable to be used
// as the result of an async_get_connection op
diagnostics last_connect_diag;
Expand Down Expand Up @@ -237,8 +241,8 @@ class basic_connection_node : public intrusive::list_base_hook<>,

// Not thread-safe
template <class CompletionToken>
auto async_run(CompletionToken&& token
) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
auto async_run(CompletionToken&& token)
-> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
{
return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
#include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
#include <boost/mysql/impl/internal/coroutine.hpp>

#include <boost/asio/any_completion_handler.hpp>
Expand Down Expand Up @@ -97,29 +98,53 @@ class basic_pool_impl
return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
}

// Do we have room for a new connection?
// Don't create new connections if we have other connections pending
// (i.e. being connected, reset... ) - otherwise pool size increases
// for no reason when there is no connectivity.
bool can_create_connection() const
{
return all_conns_.size() < params_.max_size && shared_st_.num_pending_connections == 0u &&
state_ == state_t::running;
}

// Create and run one connection
void create_connection()
{
// Connection tasks always run in the pool's executor
all_conns_.emplace_back(params_, pool_ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
all_conns_.back().async_run(asio::bind_executor(pool_ex_, asio::detached));
}

void maybe_create_connection()
// Create and run connections as required by the current config and state
void create_connections()
{
if (can_create_connection())
// Calculate how many we should create
std::size_t n = num_connections_to_create(
params_.initial_size,
params_.max_size,
all_conns_.size(),
shared_st_.num_pending_connections,
shared_st_.num_pending_requests
);

// Create them
BOOST_ASSERT((all_conns_.size() + n) <= params_.max_size);
for (std::size_t i = 0; i < n; ++i)
create_connection();
}

// An async_get_connection request is about to wait for an available connection
void enter_request_pending()
{
// Record that we're pending
++shared_st_.num_pending_requests;

// Create new connections, if required.
// Don't create any connections if we're not yet running,
// since this would leave connections running after run exits
if (state_ == state_t::running)
create_connections();
}

// An async_get_connection request finished waiting
void exit_request_pending()
{
// Record that we're no longer pending
BOOST_ASSERT(shared_st_.num_pending_requests > 0u);
--shared_st_.num_pending_requests;
}

node_type* try_get_connection()
{
if (!shared_st_.idle_list.empty())
Expand Down Expand Up @@ -205,8 +230,7 @@ class basic_pool_impl
obj_->state_ = state_t::running;

// Create the initial connections
for (std::size_t i = 0; i < obj_->params_.initial_size; ++i)
obj_->create_connection();
obj_->create_connections();

// Wait for the cancel notification to arrive.
BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
Expand Down Expand Up @@ -334,12 +358,15 @@ class basic_pool_impl
break;
}

// No luck. If there is room for more connections, create one.
obj->maybe_create_connection();
// No luck. Record that we're waiting for a connection.
obj->enter_request_pending();

// Wait to be notified, or until a cancellation happens
BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self))

// Record that we're no longer pending
obj->exit_request_pending();

// Remember that we have waited, so completions are dispatched
// correctly
has_waited = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>

#include <algorithm>
#include <cstddef>

namespace boost {
namespace mysql {
namespace detail {
Expand Down Expand Up @@ -240,6 +243,38 @@ inline diagnostics create_connect_diagnostics(error_code connect_ec, const diagn
return res;
}

// Given config params and the current state, computes the number
// of connections that the pool should create at any given point in time
inline std::size_t num_connections_to_create(
std::size_t initial_size, // config
std::size_t max_size, // config
std::size_t current_connections, // the number of connections in the pool, in any state
std::size_t pending_connections, // the number of connections in the pool in pending state
std::size_t pending_requests // the current number of async_get_connection requests that are waiting
)
{
BOOST_ASSERT(initial_size <= max_size);
BOOST_ASSERT(current_connections <= max_size);
BOOST_ASSERT(pending_connections <= current_connections);

// We aim to have one pending connection per pending request.
// When these connections successfully connect, they will fulfill the pending requests.
std::size_t required_by_requests = pending_requests > pending_connections
? static_cast<std::size_t>(pending_requests - pending_connections)
: 0u;

// We should always have at least min_connections.
// This might not be the case if the pool is just starting.
std::size_t required_by_min = current_connections < initial_size
? static_cast<std::size_t>(initial_size - current_connections)
: 0u;

// We can't excess max_connections. This is the room for new connections that we have
std::size_t room = static_cast<std::size_t>(max_size - current_connections);

return (std::min)((std::max)(required_by_requests, required_by_min), room);
}

} // namespace detail
} // namespace mysql
} // namespace boost
Expand Down
55 changes: 49 additions & 6 deletions test/integration/test/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/mysql/any_connection.hpp>
#include <boost/mysql/client_errc.hpp>
#include <boost/mysql/connection_pool.hpp>
#include <boost/mysql/diagnostics.hpp>
Expand All @@ -28,6 +29,7 @@

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <memory>
#include <stdexcept>
Expand Down Expand Up @@ -291,6 +293,43 @@ BOOST_FIXTURE_TEST_CASE(connections_created_if_required, fixture)
std::move(run_result).validate_no_error_nodiag();
}

std::int64_t get_connection_id(any_connection& conn)
{
results r;
conn.async_execute("SELECT CONNECTION_ID()", r, as_netresult).validate_no_error();
return r.rows().at(0).at(0).as_uint64();
}

// Regression check: https://github.com/boostorg/mysql/issues/395
// If there are more outstanding connection requests than pending connections,
// connections are created
BOOST_FIXTURE_TEST_CASE(connections_created_if_requests_gt_pending, fixture)
{
connection_pool pool(ctx, create_pool_params());
auto run_result = pool.async_run(as_netresult);

// Request several connections in parallel
auto conn1_result = pool.async_get_connection(diag, as_netresult);
auto conn2_result = pool.async_get_connection(diag, as_netresult);
auto conn3_result = pool.async_get_connection(diag, as_netresult);

// Resolve the requests
auto conn1 = std::move(conn1_result).get();
auto conn2 = std::move(conn2_result).get();
auto conn3 = std::move(conn3_result).get();

// They should be different connections
auto conn1_id = get_connection_id(conn1.get());
auto conn2_id = get_connection_id(conn2.get());
auto conn3_id = get_connection_id(conn3.get());
BOOST_TEST(conn1_id != conn2_id);
BOOST_TEST(conn1_id != conn3_id);

// Cleanup the pool
pool.cancel();
std::move(run_result).validate_no_error_nodiag();
}

BOOST_FIXTURE_TEST_CASE(connection_upper_limit, fixture)
{
connection_pool pool(ctx, create_pool_params(1));
Expand All @@ -314,19 +353,23 @@ BOOST_FIXTURE_TEST_CASE(connection_upper_limit, fixture)
// If a connection is requested before calling run, we wait
BOOST_DATA_TEST_CASE_F(fixture, get_connection_before_run, data::make({false, true}))
{
auto params = create_pool_params(1);
auto params = create_pool_params(151);
params.thread_safe = sample;
connection_pool pool(ctx, std::move(params));

// Get a connection before calling run
auto getconn_result = pool.async_get_connection(diag, as_netresult);
// Get some connections before calling run
auto getconn1_result = pool.async_get_connection(diag, as_netresult);
auto getconn2_result = pool.async_get_connection(diag, as_netresult);

// Call run
auto run_result = pool.async_run(as_netresult);

// Success
auto conn = std::move(getconn_result).get();
conn->async_ping(as_netresult).validate_no_error();
auto conn1 = std::move(getconn1_result).get();
auto conn2 = std::move(getconn2_result).get();

// They're different connections
BOOST_TEST(get_connection_id(conn1.get()) != get_connection_id(conn2.get()));

// Cleanup the pool
pool.cancel();
Expand Down Expand Up @@ -433,7 +476,7 @@ BOOST_DATA_TEST_CASE_F(fixture, pooled_connection_extends_pool_lifetime, data::m
}

// Having a packaged async_get_connection op extends lifetime
BOOST_FIXTURE_TEST_CASE(async_get_connection_initation_extends_pool_lifetime, fixture)
BOOST_FIXTURE_TEST_CASE(async_get_connection_initiation_extends_pool_lifetime, fixture)
{
std::unique_ptr<connection_pool> pool(new connection_pool(ctx, create_pool_params()));

Expand Down
Loading

0 comments on commit 1c67537

Please sign in to comment.