Skip to content

Commit

Permalink
Merge pull request ClickHouse#61053 from ClickHouse/async-loader-wait…
Browse files Browse the repository at this point in the history
…ers-limit

Separate limits on number of waiting and executing queries
  • Loading branch information
robot-ch-test-poll authored Mar 22, 2024
2 parents bc87d88 + c31b958 commit 03a4b5f
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 22 deletions.
16 changes: 14 additions & 2 deletions docs/en/operations/server-configuration-parameters/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ Type: UInt64

Default: 0

## max_waiting_queries

Limit on total number of concurrently waiting queries. Execution of a waiting query is blocked while required tables are loading asynchronously (see `async_load_databases`). Note that waiting queries are not counted when `max_concurrent_queries`, `max_concurrent_insert_queries`, `max_concurrent_select_queries`, `max_concurrent_queries_for_user` and `max_concurrent_queries_for_all_users` limits are checked. This correction is done to avoid hitting these limits just after server startup. Zero means unlimited.

:::note
This setting can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged.
:::

Type: UInt64

Default: 0

## max_connections

Max server connections.
Expand Down Expand Up @@ -1725,7 +1737,7 @@ Default value: `0.5`.

Asynchronous loading of databases and tables.

If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up.
If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up. Also consider setting a limit `max_waiting_queries` for the total number of waiting queries.

If `false`, all databases are loaded when the server starts.

Expand Down Expand Up @@ -2926,7 +2938,7 @@ Default: 0

## ignore_empty_sql_security_in_create_view_query {#ignore_empty_sql_security_in_create_view_query}

If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries.
If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries.

:::note
This setting is only necessary for the migration period and will become obsolete in 24.4
Expand Down
1 change: 1 addition & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,7 @@ try
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
global_context->getProcessList().setMaxSelectQueriesAmount(new_server_settings.max_concurrent_select_queries);
global_context->getProcessList().setMaxWaitingQueriesAmount(new_server_settings.max_waiting_queries);

if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
Expand Down
21 changes: 20 additions & 1 deletion src/Common/AsyncLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ void LoadJob::finish()
finish_time = std::chrono::system_clock::now();
if (waiters > 0)
finished.notify_all();
else
{
on_waiters_increment = {};
on_waiters_decrement = {};
}
}

void LoadJob::scheduled(UInt64 job_id_)
Expand Down Expand Up @@ -765,11 +770,25 @@ void AsyncLoader::wait(std::unique_lock<std::mutex> & job_lock, const LoadJobPtr
if (job->load_status != LoadStatus::PENDING) // Shortcut just to avoid incrementing ProfileEvents
return;

if (job->on_waiters_increment)
job->on_waiters_increment(job);

// WARNING: it is important not to throw below this point to avoid `on_waiters_increment` call w/o matching `on_waiters_decrement` call

Stopwatch watch;
job->waiters++;
job->finished.wait(job_lock, [&] { return job->load_status != LoadStatus::PENDING; });
job->waiters--;
ProfileEvents::increment(ProfileEvents::AsyncLoaderWaitMicroseconds, watch.elapsedMicroseconds());

if (job->on_waiters_decrement)
job->on_waiters_decrement(job);

if (job->waiters == 0)
{
job->on_waiters_increment = {};
job->on_waiters_decrement = {};
}
}

bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> &)
Expand Down Expand Up @@ -859,7 +878,7 @@ void AsyncLoader::worker(Pool & pool)
try
{
current_load_job = job.get();
SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported
SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported, but jobs can wait one another
job->execute(*this, pool_id, job);
exception_from_job = {};
}
Expand Down
65 changes: 64 additions & 1 deletion src/Common/AsyncLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ enum class LoadStatus
class LoadJob : private boost::noncopyable
{
public:
template <class LoadJobSetType, class Func, class DFFunc>
// NOTE: makeLoadJob() helper should be used instead of direct ctor call
template <class LoadJobSetType, class DFFunc, class Func>
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, DFFunc && dependency_failure_, Func && func_)
: dependencies(std::forward<LoadJobSetType>(dependencies_))
, name(std::move(name_))
Expand All @@ -69,6 +70,19 @@ class LoadJob : private boost::noncopyable
, func(std::forward<Func>(func_))
{}

// NOTE: makeLoadJob() helper should be used instead of direct ctor call
template <class LoadJobSetType, class WIFunc, class WDFunc, class DFFunc, class Func>
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, WIFunc && on_waiters_increment_, WDFunc && on_waiters_decrement_, DFFunc && dependency_failure_, Func && func_)
: dependencies(std::forward<LoadJobSetType>(dependencies_))
, name(std::move(name_))
, execution_pool_id(pool_id_)
, pool_id(pool_id_)
, on_waiters_increment(std::forward<WIFunc>(on_waiters_increment_))
, on_waiters_decrement(std::forward<WDFunc>(on_waiters_decrement_))
, dependency_failure(std::forward<DFFunc>(dependency_failure_))
, func(std::forward<Func>(func_))
{}

// Current job status.
LoadStatus status() const;
std::exception_ptr exception() const;
Expand Down Expand Up @@ -112,6 +126,13 @@ class LoadJob : private boost::noncopyable
std::atomic<size_t> execution_pool_id;
std::atomic<size_t> pool_id;

// Handlers that is called by every new waiting thread, just before going to sleep.
// If `on_waiters_increment` throws, then wait is canceled, and corresponding `on_waiters_decrement` will never be called.
// It can be used for counting and limits on number of waiters.
// Note that implementations are called under `LoadJob::mutex` and should be fast.
std::function<void(const LoadJobPtr & self)> on_waiters_increment;
std::function<void(const LoadJobPtr & self)> on_waiters_decrement;

// Handler for failed or canceled dependencies.
// If job needs to be canceled on `dependency` failure, then function should set `cancel` to a specific reason.
// Note that implementation should be fast and cannot use AsyncLoader, because it is called under `AsyncLoader::mutex`.
Expand Down Expand Up @@ -140,8 +161,50 @@ void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & depen
void ignoreDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel);

template <class F> concept LoadJobDependencyFailure = std::invocable<F, const LoadJobPtr &, const LoadJobPtr &, std::exception_ptr &>;
template <class F> concept LoadJobOnWaiters = std::invocable<F, const LoadJobPtr &>;
template <class F> concept LoadJobFunc = std::invocable<F, AsyncLoader &, const LoadJobPtr &>;

LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
}

LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
}


LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
{
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
Expand Down
66 changes: 66 additions & 0 deletions src/Common/tests/gtest_async_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,72 @@ TEST(AsyncLoader, CustomDependencyFailure)
ASSERT_EQ(good_count.load(), 3);
}

TEST(AsyncLoader, WaitersLimit)
{
AsyncLoaderTest t(16);

std::atomic<int> waiters_total{0};
int waiters_limit = 5;
auto waiters_inc = [&] (const LoadJobPtr &) {
int value = waiters_total.load();
while (true)
{
if (value >= waiters_limit)
throw Exception(ErrorCodes::ASYNC_LOAD_FAILED, "Too many waiters: {}", value);
if (waiters_total.compare_exchange_strong(value, value + 1))
break;
}
};
auto waiters_dec = [&] (const LoadJobPtr &) {
waiters_total.fetch_sub(1);
};

std::barrier sync(2);
t.loader.start();

auto job_func = [&] (AsyncLoader &, const LoadJobPtr &) {
sync.arrive_and_wait(); // (A)
};

auto job = makeLoadJob({}, "job", waiters_inc, waiters_dec, job_func);
auto task = t.schedule({job});

std::atomic<int> failure{0};
std::atomic<int> success{0};
std::vector<std::thread> waiters;
waiters.reserve(10);
auto waiter = [&] {
try
{
t.loader.wait(job);
success.fetch_add(1);
}
catch(...)
{
failure.fetch_add(1);
}
};

for (int i = 0; i < 10; i++)
waiters.emplace_back(waiter);

while (failure.load() != 5)
std::this_thread::yield();

ASSERT_EQ(job->waitersCount(), 5);

sync.arrive_and_wait(); // (A)

for (auto & thread : waiters)
thread.join();

ASSERT_EQ(success.load(), 5);
ASSERT_EQ(failure.load(), 5);
ASSERT_EQ(waiters_total.load(), 0);

t.loader.wait();
}

TEST(AsyncLoader, TestConcurrency)
{
AsyncLoaderTest t(10);
Expand Down
1 change: 1 addition & 0 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace DB
M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \
M(UInt64, max_waiting_queries, 0, "Maximum number of concurrently waiting queries blocked due to `async_load_databases`. Note that waiting queries are not considered by `max_concurrent_*queries*` limits. Zero means unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size to RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
Expand Down
Loading

0 comments on commit 03a4b5f

Please sign in to comment.