Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom thread pool #68

Merged
merged 3 commits into from
Jan 18, 2025

Conversation

dian-lun-lin
Copy link
Contributor

@dian-lun-lin dian-lun-lin commented Dec 17, 2024

This PR introduces support for custom thread pool. By allowing users to define and manage their own thread pools, we can provide greater flexibility and control over the multithreaded execution. This is particularly useful for users that need to integrate with existing thread management frameworks.

APIs (Applicable for Every Indices):

auto index  = assemble(..., num_threads);                       // Create a default thread pool with num_threads
auto index2 = assemble(..., CustomThreadPool(num_threads));     // Create a custom thread pool with num_threads
auto index3 = build(..., num_threads);                          // Create a default thread pool with num_threads
auto index4 = build(..., CustomThreadPool(num_threads));        // Create a custom thread pool with num_threads
index4.set_threadpool(CustomThreadPool2(num_threads));          // Destroy the original thread pool and set to the new thread pool during runtime

auto& threadpool_handle = index4.get_threadpool_handle();       // Get the thread pool handle
auto& threadpool = threadpool_handle.get<CustomThreadPool3>();  // Get the thread pool with the right type

size_t num_threads = index4.get_num_threads();                  // Get the number of threads used in the thread pool

There are two implementation requirements for a custom thread pool to work on SVS:

  1. size(): This method should return the number of threads in the thread pool.
  2. parallel_for(std::function<void(size_t)> f, size_t n): This method should execute the tasks. Here, f(i) represents a task on the i^th partition, and n represents the number of partitions that need to be executed.

Example:

class CustomThreadPool {

  CustomThreadPool(size_t num_threads): num_threads_{num_threads} {
  }

  // SVS requirement #1
  size_t size() {
    // return the number of workers
  }

  // SVS requirement #2
  void parallel_for(std::function<void(size_t)> f, size_t n) {
    // execute f(0), f(1), ..., f(n-1) tasks
  }
  size_t num_threads_;
};

Changes:

  1. Remove set_num_threads() at C++ level to eliminate the need of a resizable thread pool. For the python binding, we bind index.num_threads = num_threads to index.set_threadpool(svs::threads::DefaultThreadPool(num_threads)). This implementation will destroy the original thread pool and create the new one with the desired num_threads. The overhead will increase; however, this call should be made very rarely.
  2. Modify some testcases and replace set_num_threads() with set_threadpool(svs::threads::DefaultThreadPool(num_threads) due to the removal of set_num_threads().
  3. Remove thread pool reference part in Flat. In original version, Flat can either own the thread pool or take a reference of the thread pool. This is inconsistent with other indices. Also, the reference can be done by implementing a thread pool reference wrapper. For example:
class CustomThreadPoolReferenceWrapper {

  CustomThreadPoolReferenceWrapper(CustomThreadPool& threadpool): threadpool_{threadpool} {
  }

  // SVS requirement #1
  size_t size() {
    return threadpool_.size();
  }

  // SVS requirement #2
  void parallel_for(std::function<void(size_t)> f, size_t n) {
     threadpool_.run(std::move(f), n);
  }
  CustomThreadPool& threadpool;
};

Owing this thread pool reference wrapper is essentially the same as taking a reference of the thread pool.

  1. Support custom thread pool in kmeans.
  2. Support custom thread pool in ReferenceDataset.
  3. Add CppAsyncThreadPool and QueueThreadPool for both example and testing
  4. Add ThreadPoolReferenceWrapper as a utility reference wrapper.
  5. Replace the name run with parallel_for. parallel_for is more meaningful and precise.
  6. Replace FunctionRef with std::function<void(size_t)>. As we expose parallel_for to users, std::function<void(size_t)> is more general. I did not observe a significant increase in binary size, and the change does not impact performance.

@dian-lun-lin dian-lun-lin marked this pull request as ready for review December 18, 2024 21:54
@dian-lun-lin dian-lun-lin merged commit 1b0ba34 into intel:main Jan 18, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants