forked from eteran/cpp-utilities
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_pool.h
105 lines (91 loc) · 2.21 KB
/
thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
class thread_pool {
public:
using work_type = std::function<void()>;
public:
/**
* Creates the thread pool with N threads where N is the value of
* std::thread::hardware_concurrency()
*
* @param count The number of threads in the pool
*/
thread_pool() : thread_pool(std::thread::hardware_concurrency()) {
}
/**
* Creates the thread pool with <count> threads
*
* @param count The number of threads in the pool
*/
thread_pool(std::size_t count) {
// create all the threads
for(std::size_t i = 0; i < count; ++i) {
threads_.emplace_back([this]() {
// keep looking for more tasks until we suicide
while(true) {
// get a new worker, this'll block while the queue is empty
work_type worker = get_worker();
// special case?
if(!worker) {
this->add_worker(nullptr);
break;
} else {
// do the work
worker();
}
}
});
}
}
/**
* Destroys the thread pool, waits still all outstanding work is complete
*/
~thread_pool() {
// add special token which tells all the threads to suicide
add_worker(nullptr);
// wait till all outstanding tasks are done
for(std::thread &thread : threads_) {
thread.join();
}
}
public:
/**
* Adds a new work item to the pool for execution
*
* @param worker the work to do
*/
void add_worker(work_type worker) {
std::lock_guard<std::mutex> lock(queue_lock_);
work_queue_.push(worker);
queue_condition_.notify_one();
}
/**
* Waits until there is at least one work item to do, and then returns
* the work item after popping it off the queue
*
* @return
*/
work_type get_worker() {
std::unique_lock<std::mutex> lock(queue_lock_);
queue_condition_.wait(lock, [this]() {
return !work_queue_.empty();
});
work_type val = work_queue_.front();
work_queue_.pop();
return val;
}
private:
std::vector<std::thread> threads_;
std::queue<work_type> work_queue_;
std::mutex queue_lock_;
std::condition_variable queue_condition_;
};
#endif