-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaffinity_thread_pool.h
426 lines (403 loc) · 15.3 KB
/
affinity_thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
#pragma once
#include <queue>
#include <condition_variable>
#include <vector>
#include <thread>
#include <chrono>
#include <functional>
#include <cassert>
#include <atomic>
#include "thread_pool_pinning.h"
// thread pool with multiple queue of inputs to a function
// using a queue synchronized with condition variables
// each queue belongs to a specific worker thread
template<typename Arg>
class affinity_thread_pool
{
public:
// definitions of the family of function that can be executed by the worker
typedef std::function<void(const Arg&, std::size_t)> FunctionType;
private:
// number of worker threads
const std::size_t m_number_of_workers;
// thread pool CPU pinning information
const thread_pool_pinning m_pinning;
// time for submitter to wait when queue full (nanoseconds)
// note that the notification from the worker should wake up the submitter
// and this timeout is just used a fallback mechanism
const unsigned long m_submit_wait_time;
// time to wait when idle (nanoseconds)
// note that the notification from the submitter should wake up the worker
// and this timeout is just used a fallback mechanism
const unsigned long m_wait_time;
// internal state indicating that processing of existing work should finish
// and no new work should be submitted
volatile bool m_done;
// queues of work items, one for each worker
std::vector<std::queue<Arg>> m_queues;
// mutex used by the empty/full conditions and related variables
mutable std::vector<std::mutex> m_mutexes;
// condition used to signal if a queue is empty or not empty
std::vector<std::condition_variable> m_empty_conds;
// condition used to signal if a queue is full or not full
std::vector<std::condition_variable> m_full_conds;
// the actual worker threads
std::vector<std::thread> m_workers;
// running counter incremented every time a submit
// is called. this is used so that when submitting
// in round robin into the queue, we start every time from a different queue
std::size_t m_running_counter;
// number of running worker threads
std::atomic<std::size_t> m_number_of_running_workers;
// max size of queue
const std::size_t m_max_queue_size;
// definitions of the family of cleanup functions on Arg
typedef std::function<void(const Arg&)> CleanupFunctionType;
// cleanup function on Arg when FunctionType is not executed
// assumption is that if Arg dtor is not enough (e.g. Arg is a pointer)
// then FunctionType would do the cleanup
const CleanupFunctionType m_cleanup_function;
// Generic name assigned as a prefix to each worker thread
const std::string m_name;
// this is the function executed by the worker threads
// it pull items ot of the queue until signaled to stop
// it also passesd worker id to F
void worker(FunctionType F, std::size_t worker_id)
{
assert(worker_id < m_number_of_workers);
assert(m_number_of_running_workers < m_number_of_workers);
++m_number_of_running_workers;
auto& q = m_queues[worker_id];
std::condition_variable& empty_cond = m_empty_conds[worker_id];
std::condition_variable& full_cond = m_full_conds[worker_id];
std::mutex& m = m_mutexes[worker_id];
while (true)
{
Arg arg;
{
std::unique_lock<std::mutex> lock(m);
// if queue is not empty we continue regardless or "done" indication
if (q.empty())
{
// queue is empty
if (m_done)
{
// queue is empty and we are done
return;
}
else
{
// queue is empty but we are not done yet
// wait to get notified, either on queue not empty of being done
while (!m_done && q.empty())
{
try
{
empty_cond.wait_for(lock, std::chrono::nanoseconds(m_wait_time));
}
catch(...)
{
// this should not happened
// nothing we can do, just try again
}
}
if (q.empty())
{
// done and empty
return;
}
}
}
// there is work to do
arg = q.front();
q.pop();
}
// notify that queue is not full
full_cond.notify_one();
// execute the work when the mutex is not locked
F(arg, worker_id);
}
}
// by default we assume the dtor of Arg takes
// care of cleanup
static void default_cleanup_function(const Arg&)
{
// no-op
}
public:
// indicating that any worker thread may execute the work
static const int NoAffinity = -1;
// return estimated worker queue size
std::size_t queue_size(std::size_t worker_id) const
{
if (worker_id < m_queues.size())
{
std::unique_lock<std::mutex> lock(m_mutexes[worker_id]);
return m_queues[worker_id].size();
}
return 0;
}
// return the pinning policy used
thread_pool_pinning::policy_t pinning_policy() const
{
return m_pinning.policy();
}
// don't allow copying
affinity_thread_pool& operator=(const affinity_thread_pool&) = delete;
affinity_thread_pool(const affinity_thread_pool&) = delete;
// destructor clean the threads
~affinity_thread_pool()
{
// dont wait for the threads
if (!m_done)
{
stop(false);
}
}
//! constructor spawn the threads
affinity_thread_pool(std::size_t number_of_workers,
std::size_t queue_size,
FunctionType F,
const thread_pool_pinning& pinning,
unsigned long submit_wait_time,
unsigned long wait_time,
CleanupFunctionType C = std::bind(&affinity_thread_pool::default_cleanup_function, std::placeholders::_1),
const std::string& name = "") :
m_number_of_workers(number_of_workers),
m_pinning(pinning),
m_submit_wait_time(submit_wait_time),
m_wait_time(wait_time),
m_done(false),
m_queues(number_of_workers),
m_mutexes(number_of_workers),
m_empty_conds(number_of_workers),
m_full_conds(number_of_workers),
m_running_counter(0),
m_number_of_running_workers(0),
m_max_queue_size(queue_size),
m_cleanup_function(C),
m_name(name)
{
for (auto i = 0U; i < m_number_of_workers; ++i)
{
try
{
// start all worker threads
m_workers.push_back(std::thread(&affinity_thread_pool::worker, this, F, i));
// Set the name of the thread based on worker ID
const std::string workerName = m_name + "Worker" + std::to_string(i);
// only 16 char names are allowed. so, first 15 char are passed plust null terminate
pthread_setname_np(m_workers[i].native_handle(), workerName.substr(0,15).c_str());
// get the CPU set for that thread according to the pinning policy
cpu_set_t cpu_set;
if (m_pinning.get_cpu_set(i, &cpu_set))
{
pthread_setaffinity_np(m_workers[i].native_handle(), sizeof(cpu_set_t), &cpu_set);
}
// if pinning is not possible we just dont set any affinity for the thread
}
catch (...)
{
// failed to start a thread
// make sure that we dont wait or all threads to start or we will wait forever
m_done = true;
return;
}
}
}
// submit new argument to be processed by the threads
// blocking call
void submit(const Arg& arg, int worker_id = NoAffinity)
{
assert(worker_id < static_cast<int>(m_number_of_workers) && worker_id >= NoAffinity);
std::size_t actual_q_idx = static_cast<std::size_t>(worker_id);
{
if (m_done)
{
return;
}
else if (worker_id == NoAffinity)
{
// no affinity, find a free queue
bool pushed = false;
// dont always start from firts queue
std::size_t q_idx = (++m_running_counter)%m_number_of_workers;
for (auto i = 0U; i < m_number_of_workers; ++i)
{
std::unique_lock<std::mutex> lock(m_mutexes[q_idx]);
if (m_queues[q_idx].size() < m_max_queue_size)
{
m_queues[q_idx].push(arg);
pushed = true;
actual_q_idx = q_idx;
break;
}
// try the next queue
q_idx = (q_idx+1)%m_number_of_workers;
}
if (!pushed)
{
std::unique_lock<std::mutex> lock(m_mutexes[q_idx]);
// all queues were busy wait on arbitrary queue
while (!m_done && m_queues[q_idx].size() >= m_max_queue_size)
{
try
{
m_full_conds[q_idx].wait_for(lock, std::chrono::nanoseconds(m_submit_wait_time));
}
catch(...)
{
// this should not happened
// nothing we can do, just try again
}
}
if (m_done)
{
// marked as done while we were waiting
return;
}
m_queues[q_idx].push(arg);
actual_q_idx = q_idx;
}
}
else
{
std::unique_lock<std::mutex> lock(m_mutexes[worker_id]);
// has affinity, try using a specific worker
while (!m_done && m_queues[worker_id].size() >= m_max_queue_size)
{
try
{
m_full_conds[worker_id].wait_for(lock, std::chrono::nanoseconds(m_submit_wait_time));
}
catch(...)
{
// this should not happened
// nothing we can do, just try again
}
}
if (m_done)
{
// marked as done while we were waiting
return;
}
m_queues[worker_id].push(arg);
}
}
// assertion should fail in the case that NoAffinity was not converted to an actual queue index
// in this case actual_q_idx will be 0xFFFFFFFF
assert(actual_q_idx < m_number_of_workers);
// notify that queue is not empty
m_empty_conds[actual_q_idx].notify_one();
}
// submit new argument to be processed by the threads if queue has space
// non-blocking call
bool try_submit(const Arg& arg, int worker_id = NoAffinity)
{
std::size_t actual_q_idx = static_cast<std::size_t>(worker_id);
assert(worker_id < static_cast<int>(m_number_of_workers) && worker_id >= NoAffinity);
{
if (m_done)
{
return false;
}
else if (worker_id == NoAffinity)
{
// no affinity, find a free queue
bool pushed = false;
// dont always start from firts queue
std::size_t q_idx = (++m_running_counter)%m_number_of_workers;
for (auto i = 0U; i < m_number_of_workers; ++i)
{
std::unique_lock<std::mutex> lock(m_mutexes[q_idx]);
if (m_queues[q_idx].size() < m_max_queue_size)
{
m_queues[q_idx].push(arg);
pushed = true;
actual_q_idx = q_idx;
break;
}
// try the next queue
q_idx = (q_idx+1)%m_number_of_workers;
}
if (!pushed)
{
// all queues were busy
return false;
}
}
else
{
std::unique_lock<std::mutex> lock(m_mutexes[worker_id]);
// has affinity, try using a specific worker
if (m_queues[worker_id].size() >= m_max_queue_size)
{
return false;
}
else
{
m_queues[worker_id].push(arg);
}
}
}
// assertion should fail in the case that NoAffinity was not converted to an actual queue index
// in this case actual_q_idx will be 0xFFFFFFFF
assert(actual_q_idx < m_number_of_workers);
// notify that queue is not empty
m_empty_conds[actual_q_idx].notify_one();
return true;
}
// stop all threads, may or may not wait to finish
void stop(bool wait)
{
{
// take lock on all workers to make sure worker threads are either waiting
// or processing, and will check on m_done before next iteration
std::vector<std::unique_lock<std::mutex>> all_locks;
for (auto& m : m_mutexes)
{
all_locks.emplace_back(m);
}
if (m_done)
{
return;
}
// dont allow new submitions
m_done = true;
if (!wait)
{
// drain the queues without running F on it
for (auto& q : m_queues)
{
while (!q.empty())
{
m_cleanup_function(q.front());
q.pop();
}
}
}
}
// notify all that we are done
for (auto& cond : m_empty_conds)
{
cond.notify_all();
}
for (auto& cond : m_full_conds)
{
cond.notify_all();
}
for (auto& worker : m_workers)
{
// join on threads until they actually finish
try
{
worker.join();
}
catch (...)
{
// could happen if F is deadlocked or if not all threads actually started
// not much we can do here
}
}
}
};