Skip to content

Commit

Permalink
enable clang format
Browse files Browse the repository at this point in the history
  • Loading branch information
dian-lun-lin committed Jan 17, 2025
1 parent b37a020 commit 0903d93
Show file tree
Hide file tree
Showing 28 changed files with 671 additions and 527 deletions.
7 changes: 5 additions & 2 deletions bindings/python/src/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,15 @@ Convert the `fvecs` file on disk with 32-bit floating point entries to a `fvecs`

// Intel(R) MKL
m.def(
"have_mkl", &svs::python::have_mkl, "Return whether or not svs is linked with Intel(R) MKL."
"have_mkl",
&svs::python::have_mkl,
"Return whether or not svs is linked with Intel(R) MKL."
);
m.def(
"mkl_num_threads",
&svs::python::mkl_num_threads,
"Return the number of threads used by Intel(R) MKL, or None if svs is not linked with Intel(R) MKL."
"Return the number of threads used by Intel(R) MKL, or None if svs is not linked "
"with Intel(R) MKL."
);

///// Indexes
Expand Down
183 changes: 88 additions & 95 deletions examples/cpp/custom_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,146 +16,136 @@

//! [Includes]
// SVS Dependencies
#include "svs/orchestrators/vamana.h" // bulk of the dependencies required.
#include "svs/core/recall.h" // Convenient k-recall@n computation.
#include "svs/lib/threads.h" // Thread pool-related dependencies.
#include "svs/core/recall.h" // Convenient k-recall@n computation.
#include "svs/lib/threads.h" // Thread pool-related dependencies.
#include "svs/orchestrators/vamana.h" // bulk of the dependencies required.

// Alternative main definition
#include "svsmain.h"

// stl
#include <future>
#include <map>
#include <queue>
#include <string>
#include <string_view>
#include <vector>
#include <queue>
#include <future>
//! [Includes]

namespace {

//! [Helper Utilities]
template <typename T>
struct MoC {
MoC(T&& rhs): obj(std::move(rhs)) {}
MoC(const MoC& other): obj(std::move(other.obj)) {}
template <typename T> struct MoC {
MoC(T&& rhs)
: obj(std::move(rhs)) {}
MoC(const MoC& other)
: obj(std::move(other.obj)) {}
T& get() { return obj; }
mutable T obj;
};
//! [Helper Utilities]

//! [Custom thread pool implementation]
class CustomThreadPool {

public:
explicit CustomThreadPool(size_t num_threads) {
threads_.reserve(num_threads);
for(size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this]() {
while(!stop_) {
std::function<void()> task;
{
std::unique_lock lock(mtx_);
while(queue_.empty() && !stop_) {
cv_.wait(lock);
}
if(!queue_.empty()) {
task = queue_.front();
queue_.pop();
}
}

if(task) {
task();
}
}
});
}
}

CustomThreadPool(CustomThreadPool&&) = delete;
CustomThreadPool(const CustomThreadPool&) = delete;
CustomThreadPool& operator=(CustomThreadPool&&) = delete;
CustomThreadPool& operator=(const CustomThreadPool&) = delete;

~CustomThreadPool() {
shutdown();
for(auto& t: threads_) {
t.join();
}
}

template <typename C>
std::future<void> insert(C&& task) {
std::promise<void> prom;
std::future<void> fu = prom.get_future();
{
std::scoped_lock lock(mtx_);
queue_.push([moc=MoC{std::move(prom)}, task=std::forward<C>(task)]() mutable {
task();
moc.obj.set_value();
});
}
cv_.notify_one();
return fu;
}

size_t size() const {
return threads_.size();
}

void shutdown() {
std::scoped_lock lock(mtx_);
stop_ = true;
cv_.notify_all();
}
explicit CustomThreadPool(size_t num_threads) {
threads_.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this]() {
while (!stop_) {
std::function<void()> task;
{
std::unique_lock lock(mtx_);
while (queue_.empty() && !stop_) {
cv_.wait(lock);
}
if (!queue_.empty()) {
task = queue_.front();
queue_.pop();
}
}

if (task) {
task();
}
}
});
}
}

private:
CustomThreadPool(CustomThreadPool&&) = delete;
CustomThreadPool(const CustomThreadPool&) = delete;
CustomThreadPool& operator=(CustomThreadPool&&) = delete;
CustomThreadPool& operator=(const CustomThreadPool&) = delete;

std::vector<std::thread> threads_;
std::mutex mtx_;
std::condition_variable cv_;
~CustomThreadPool() {
shutdown();
for (auto& t : threads_) {
t.join();
}
}

bool stop_{false};
std::queue<std::function<void()>> queue_;
template <typename C> std::future<void> insert(C&& task) {
std::promise<void> prom;
std::future<void> fu = prom.get_future();
{
std::scoped_lock lock(mtx_);
queue_.push([moc = MoC{std::move(prom)},
task = std::forward<C>(task)]() mutable {
task();
moc.obj.set_value();
});
}
cv_.notify_one();
return fu;
}

size_t size() const { return threads_.size(); }

void shutdown() {
std::scoped_lock lock(mtx_);
stop_ = true;
cv_.notify_all();
}

private:
std::vector<std::thread> threads_;
std::mutex mtx_;
std::condition_variable cv_;

bool stop_{false};
std::queue<std::function<void()>> queue_;
};

/////
///// The wrapper for CustomThreadPool to work on SVS
/////
class CustomThreadPoolWrapper {

public:
CustomThreadPoolWrapper(size_t num_threads)
: threadpool_{std::make_unique<CustomThreadPool>(num_threads)} {}

CustomThreadPoolWrapper(size_t num_threads): threadpool_{std::make_unique<CustomThreadPool>(num_threads)} {
}

void parallel_for(std::function<void(size_t)> f, size_t n) {
void parallel_for(std::function<void(size_t)> f, size_t n) {
std::vector<std::future<void>> futures;
futures.reserve(n);
for(size_t i = 0; i < n; ++i) {
futures.emplace_back(threadpool_->insert([&f, i]() {
f(i);
}));
for (size_t i = 0; i < n; ++i) {
futures.emplace_back(threadpool_->insert([&f, i]() { f(i); }));
}

// wait until all tasks are finished
for(auto& fu: futures) {
for (auto& fu : futures) {
fu.get();
}
}
}

size_t size() const {
return threadpool_->size();
}
size_t size() const { return threadpool_->size(); }

private:

std::unique_ptr<CustomThreadPool> threadpool_;
std::unique_ptr<CustomThreadPool> threadpool_;
};
static_assert(svs::threads::ThreadPool<CustomThreadPoolWrapper>);
//! [Custom thread pool implementation]
}
} // namespace

//! [Helper Utilities]
double run_recall(
Expand Down Expand Up @@ -217,7 +207,10 @@ int svs_main(std::vector<std::string> args) {
//! [Index Build]
size_t num_threads = 4;
svs::Vamana index = svs::Vamana::build<float>(
parameters, svs::VectorDataLoader<float>(data_vecs), svs::DistanceL2(), CustomThreadPoolWrapper(num_threads)
parameters,
svs::VectorDataLoader<float>(data_vecs),
svs::DistanceL2(),
CustomThreadPoolWrapper(num_threads)
);
//! [Index Build]

Expand Down
4 changes: 2 additions & 2 deletions examples/cpp/vamana.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

//! [Includes]
// SVS Dependencies
#include "svs/orchestrators/vamana.h" // bulk of the dependencies required.
#include "svs/core/recall.h" // Convenient k-recall@n computation.
#include "svs/orchestrators/vamana.h" // bulk of the dependencies required.
#include "svs/core/recall.h" // Convenient k-recall@n computation.

// Alternative main definition
#include "svsmain.h"
Expand Down
4 changes: 2 additions & 2 deletions include/svs/core/distance/euclidean.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
// implementation based on the available extension.
//
// The vector width used is added as well in case there is an apparent mismatch between
// Intel(R) AVX extension and the preferred vector width due to performance (sometimes, smaller
// vector widths are faster).
// Intel(R) AVX extension and the preferred vector width due to performance (sometimes,
// smaller vector widths are faster).
//
// Versions for older extensions are implemented as fallbacks.
//
Expand Down
12 changes: 6 additions & 6 deletions include/svs/core/distance/simd_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ namespace simd {
/// 2. Using multiple accumulators can really help in some situations. Floating point
/// arithmetic is not associative, so generally the compiler must strictly obey program
/// semantics when optimizing. This means that if a single accumulator register is used,
/// we introduce a long chain dependency in the instruction stream. Intel(R) AVX functional
/// units are generally pipelined and so have a relatively high latency (4 cycles is common)
/// but with a high throughput.
/// we introduce a long chain dependency in the instruction stream. Intel(R) AVX
/// functional units are generally pipelined and so have a relatively high latency (4
/// cycles is common) but with a high throughput.
///
/// For example: Cascadelake and greater servers have two execution port that offer the
/// bulk of Intel(R) AVX-512 functionality. When fully utilized, SIMD instructions can obtain
/// a throughput of 2 ops per cycle (separate from loads, which can sustain another 2 ops
/// (3 ops on Sapphire Rapids) per cycle.
/// bulk of Intel(R) AVX-512 functionality. When fully utilized, SIMD instructions can
/// obtain a throughput of 2 ops per cycle (separate from loads, which can sustain
/// another 2 ops (3 ops on Sapphire Rapids) per cycle.
///
/// A long dependence on a single accumulation register basically throws all that
/// horse-power away.
Expand Down
17 changes: 10 additions & 7 deletions include/svs/core/kmeans.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ Neighbor<size_t> find_nearest(const Query& query, const Data& data) {
return nearest;
}

template <data::ImmutableMemoryDataset Data, data::ImmutableMemoryDataset Centroids, threads::ThreadPool Pool>
double mean_squared_error(
const Data& data, const Centroids& centroids, Pool& threadpool
) {
template <
data::ImmutableMemoryDataset Data,
data::ImmutableMemoryDataset Centroids,
threads::ThreadPool Pool>
double mean_squared_error(const Data& data, const Centroids& centroids, Pool& threadpool) {
threads::SequentialTLS<double> sums(0, threadpool.size());
threads::parallel_for(
threadpool,
Expand Down Expand Up @@ -143,7 +144,10 @@ void process_batch(
adjust_centroids.finish();
}

template <data::ImmutableMemoryDataset Data, typename Callback = lib::donothing, threads::ThreadPool Pool>
template <
data::ImmutableMemoryDataset Data,
typename Callback = lib::donothing,
threads::ThreadPool Pool>
data::SimpleData<float> train_impl(
const KMeansParameters& parameters,
const Data& data,
Expand Down Expand Up @@ -215,8 +219,7 @@ data::SimpleData<float> train(
ThreadPoolProto threadpool_proto,
Callback&& post_epoch_callback = lib::donothing()
) {
auto threadpool =
threads::as_threadpool(std::move(threadpool_proto));
auto threadpool = threads::as_threadpool(std::move(threadpool_proto));
return train_impl(
parameters, data, threadpool, std::forward<Callback>(post_epoch_callback)
);
Expand Down
Loading

0 comments on commit 0903d93

Please sign in to comment.