Skip to content

Commit

Permalink
Documented connection_pool execution semantics
Browse files Browse the repository at this point in the history
Refactored pool internals to remove wait_group
Added an external strand thread-safety test

close #361
  • Loading branch information
anarthal authored Oct 10, 2024
1 parent b7b2061 commit bb568ba
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 241 deletions.
29 changes: 29 additions & 0 deletions include/boost/mysql/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,18 @@ class connection_pool
* \par Handler signature
* The handler signature for this operation is `void(boost::mysql::error_code)`
*
* \par Executor
*
* The final handler is executed using `token`'s associated executor,
* or `this->get_executor()` if the token doesn't have an associated
* executor. The final handler is called as if it was submitted using `asio::post`,
* and is never be called inline from within this function.
*
* If the pool was constructed with thread-safety enabled, intermediate
* completion handlers are executed using an internal strand that wraps `this->get_executor()`.
* Otherwise, intermediate handlers are executed using `this->get_executor()`.
* In any case, the token's associated executor is only used for the final handler.
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation. Cancelling `async_run`
* is equivalent to calling \ref connection_pool::cancel.
Expand Down Expand Up @@ -583,6 +595,23 @@ class connection_pool
* The handler signature for this operation is
* `void(boost::mysql::error_code, boost::mysql::pooled_connection)`
*
* \par Executor
*
* If the final handler has an associated immediate executor, and the operation
* completes immediately, the final handler is dispatched to it.
* Otherwise, the final handler is called as if it was submitted using `asio::post`,
* and is never be called inline from within this function.
* Immediate completions can only happen when thread-safety is not enabled.
*
* The final handler is executed using `token`'s associated executor,
* or `this->get_executor()` if the token doesn't have an associated
* executor.
*
* If the pool was constructed with thread-safety enabled, intermediate
* completion handlers are executed using an internal strand that wraps `this->get_executor()`.
* Otherwise, intermediate handlers are executed using
* `token`'s associated executor if it has one, or `this->get_executor()` if it hasn't.
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation.
* Cancelling `async_get_connection` has no observable side effects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,24 @@ struct conn_shared_state
// as the result of an async_get_connection op
diagnostics last_connect_diag;

// The number of running connections, to track when they exit
std::size_t num_running_connections{0};

// Timer acting as a condition variable to wait for all connections to exit
asio::basic_waitable_timer<ClockType> conns_finished_cv;

conn_shared_state(asio::any_io_executor ex)
: idle_connections_cv(std::move(ex), (ClockType::time_point::max)())
: idle_connections_cv(ex, (ClockType::time_point::max)()),
conns_finished_cv(std::move(ex), (ClockType::time_point::max)())
{
}

void on_connection_start() { ++num_running_connections; }

void on_connection_finish()
{
if (--num_running_connections == 0u)
conns_finished_cv.expires_at((ClockType::time_point::min)());
}
};

Expand All @@ -71,7 +86,7 @@ class basic_connection_node : public intrusive::list_base_hook<>,
using this_type = basic_connection_node<ConnectionType, ClockType>;
using timer_type = asio::basic_waitable_timer<ClockType>;

// Not thread-safe, must be manipulated within the pool's executor
// Not thread-safe
const internal_pool_params* params_;
conn_shared_state<ConnectionType, ClockType>* shared_st_;
ConnectionType conn_;
Expand Down Expand Up @@ -123,7 +138,15 @@ class basic_connection_node : public intrusive::list_base_hook<>,
connection_task_op(this_type& node) noexcept : node_(node) {}

template <class Self>
void operator()(Self& self, error_code ec = {})
void operator()(Self& self)
{
// Called when the op starts
node_.shared_st_->on_connection_start();
(*this)(self, error_code());
}

template <class Self>
void operator()(Self& self, error_code ec)
{
// A collection status may be generated by idle_wait actions
auto col_st = last_act_ == next_connection_action::idle_wait
Expand Down Expand Up @@ -178,7 +201,10 @@ class basic_connection_node : public intrusive::list_base_hook<>,
self
);
break;
case next_connection_action::none: self.complete(error_code()); break;
case next_connection_action::none:
node_.shared_st_->on_connection_finish();
self.complete(error_code());
break;
default: BOOST_ASSERT(false); // LCOV_EXCL_LINE
}
}
Expand All @@ -201,35 +227,36 @@ class basic_connection_node : public intrusive::list_base_hook<>,
{
}

// Not thread-safe
void cancel()
{
sansio_connection_node<this_type>::cancel();
timer_.cancel();
collection_timer_.cancel();
}

// This initiation must be invoked within the pool's executor
// Not thread-safe
template <class CompletionToken>
auto async_run(CompletionToken&& token
) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
{
return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
}

ConnectionType& connection() noexcept { return conn_; }
const ConnectionType& connection() const noexcept { return conn_; }

// Not thread-safe, must be called within the pool's executor
// Not thread-safe
void notify_collectable() { collection_timer_.cancel(); }

// Thread-safe. May be safely be called from any thread.
// Thread-safe
void mark_as_collectable(bool should_reset) noexcept
{
collection_state_.store(
should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
);
}

// Getter, used by pooled_connection
ConnectionType& connection() noexcept { return conn_; }

// Exposed for testing
collection_state get_collection_state() const noexcept { return collection_state_; }
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
#include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
#include <boost/mysql/impl/internal/coroutine.hpp>

#include <boost/asio/any_completion_handler.hpp>
Expand All @@ -31,7 +30,7 @@
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/immediate.hpp>
Expand Down Expand Up @@ -89,7 +88,6 @@ class basic_pool_impl
state_t state_{state_t::initial};
std::list<node_type> all_conns_;
shared_state_type shared_st_;
wait_group wait_gp_;
timer_type cancel_timer_;
const pipeline_request reset_pipeline_req_{make_reset_pipeline()};

Expand All @@ -111,8 +109,9 @@ class basic_pool_impl

void create_connection()
{
// Connection tasks always run in the pool's executor
all_conns_.emplace_back(params_, pool_ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
all_conns_.back().async_run(asio::bind_executor(pool_ex_, asio::detached));
}

void maybe_create_connection()
Expand Down Expand Up @@ -225,7 +224,11 @@ class basic_pool_impl
obj_->shared_st_.idle_connections_cv.expires_at((ClockType::time_point::min)());

// Wait for all connection tasks to exit
BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
BOOST_MYSQL_YIELD(
resume_point_,
4,
obj_->shared_st_.conns_finished_cv.async_wait(std::move(self))
)

// Done
cancel_slot_.clear();
Expand Down Expand Up @@ -419,7 +422,6 @@ class basic_pool_impl
conn_ex_(params.connection_executor ? std::move(params.connection_executor) : original_pool_ex_),
params_(make_internal_pool_params(std::move(params))),
shared_st_(pool_ex_),
wait_gp_(pool_ex_),
cancel_timer_(pool_ex_, (std::chrono::steady_clock::time_point::max)())
{
}
Expand Down
67 changes: 0 additions & 67 deletions include/boost/mysql/impl/internal/connection_pool/wait_group.hpp

This file was deleted.

7 changes: 6 additions & 1 deletion test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ local requirements =
<define>BOOST_ASIO_HAS_DEFAULT_FUNCTION_TEMPLATE_ARGUMENTS=1
<define>BOOST_ALLOW_DEPRECATED_HEADERS=1
# Disable warning C4702: unreachable code, produced by Boost.Asio buffer.hpp
<toolset>msvc:<cxxflags>"/bigobj /wd4702 /permissive-"
# Remove /wd4100 when PFR fixes warnings
<toolset>msvc:<cxxflags>"/bigobj /wd4702 /wd4100 /permissive-"
<toolset>msvc:<define>_SCL_SECURE_NO_WARNINGS=1
<toolset>msvc:<define>_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
<toolset>msvc:<define>_SILENCE_CXX17_ADAPTOR_TYPEDEFS_DEPRECATION_WARNING
Expand All @@ -74,6 +75,10 @@ local requirements =
<toolset>gcc,<thread-sanitizer>norecover:<cxxflags>-Wno-tsan
# gcc-11 emits spurious warnings for valid vector::insert ops
<toolset>gcc-11:<cxxflags>-Wno-stringop-overflow
# TODO: remove when PFR unused warnings are fixed
# https://github.com/boostorg/pfr/pull/187
<toolset>gcc:<cxxflags>-Wno-unused-parameter
<toolset>clang:<cxxflags>-Wno-unused-parameter
<target-os>linux:<define>_XOPEN_SOURCE=600
<target-os>linux:<define>_GNU_SOURCE=1
<target-os>windows:<define>_WIN32_WINNT=0x0601
Expand Down
1 change: 1 addition & 0 deletions test/thread_safety/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local tests =
connection_pool
connection_pool_two_contexts
connection_pool_coroutines
connection_pool_external_strand
connection_pool_cancel
connection_pool_cancel_get_connection
;
Expand Down
Loading

0 comments on commit bb568ba

Please sign in to comment.