Skip to content

Commit

Permalink
Merge pull request #5075 from afeher/drqs-175968161-add-moveing-jobs-…
Browse files Browse the repository at this point in the history
…to-threadpool-reinstating

Add move for jobs in bdlmt_multiqueuethreadpool.
  • Loading branch information
cppguru authored and GitHub Enterprise committed Nov 5, 2024
2 parents 77fe726 + 36f4609 commit dc3c96d
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 36 deletions.
20 changes: 10 additions & 10 deletions groups/bdl/bdlmt/bdlmt_multiqueuethreadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ BSLS_IDENT_RCSID(bdlmt_multiqueuethreadpool_cpp,"$Id$ $CSID$")
#include <bdlf_bind.h>
#include <bdlf_memfn.h>

#include <bslma_default.h>

#include <bslmt_latch.h>
#include <bslmt_lockguard.h>
#include <bslmt_threadutil.h>
#include <bslmt_writelockguard.h>

#include <bslma_default.h>

#include <bsls_assert.h>
#include <bsls_log.h>
#include <bsls_stackaddressutil.h>
Expand Down Expand Up @@ -201,14 +201,14 @@ void MultiQueueThreadPool_Queue::executeFront()
functors.reserve(count);

for (bsl::size_t i = 0; i < count; ++i) {
functors.emplace_back(d_list.front());
functors.emplace_back(bslmf::MovableRefUtil::move(d_list.front()));
d_list.pop_front();
}

d_processor = bslmt::ThreadUtil::self();
}

// Note that the appropriate 'd_runState' is a bit ambigoues at this point.
// Note that the appropriate 'd_runState' is a bit ambiguous at this point.
// Since there is nothing scheduled in the thread pool, the state should
// arguably be 'e_NOT_SCHEDULED'. However, allowing work to be scheduled
// during the execution of the 'functors' would be a bug. Instead of
Expand Down Expand Up @@ -319,12 +319,12 @@ int MultiQueueThreadPool_Queue::initiatePause()
return 0;
}

int MultiQueueThreadPool_Queue::pushBack(const Job& functor)
int MultiQueueThreadPool_Queue::pushBack(bslmf::MovableRef<Job> functor)
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

if (e_ENQUEUING_ENABLED == d_enqueueState) {
d_list.push_back(functor);
d_list.push_back(bslmf::MovableRefUtil::move(functor));

// Note that the following should match what is in 'pushFront'.

Expand All @@ -333,7 +333,7 @@ int MultiQueueThreadPool_Queue::pushBack(const Job& functor)

++d_multiQueueThreadPool_p->d_numActiveQueues;

int status = d_multiQueueThreadPool_p->d_threadPool_p->
const int status = d_multiQueueThreadPool_p->d_threadPool_p->
enqueueJob(d_processingCb);

BSLS_ASSERT_OPT(0 == status); (void)status;
Expand All @@ -345,12 +345,12 @@ int MultiQueueThreadPool_Queue::pushBack(const Job& functor)
return 1;
}

int MultiQueueThreadPool_Queue::pushFront(const Job& functor)
int MultiQueueThreadPool_Queue::pushFront(bslmf::MovableRef<Job> functor)
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

if (e_ENQUEUING_ENABLED == d_enqueueState) {
d_list.push_front(functor);
d_list.push_front(bslmf::MovableRefUtil::move(functor));

// Note that the following should match what is in 'pushBack'.

Expand All @@ -360,7 +360,7 @@ int MultiQueueThreadPool_Queue::pushFront(const Job& functor)

++d_multiQueueThreadPool_p->d_numActiveQueues;

int status = d_multiQueueThreadPool_p->d_threadPool_p->
const int status = d_multiQueueThreadPool_p->d_threadPool_p->
enqueueJob(d_processingCb);

BSLS_ASSERT_OPT(0 == status); (void)status;
Expand Down
61 changes: 43 additions & 18 deletions groups/bdl/bdlmt/bdlmt_multiqueuethreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,19 @@ BSLS_IDENT("$Id: $")

#include <bdlscm_version.h>

#include <bslmt_lockguard.h>
#include <bdlmt_threadpool.h>

#include <bdlcc_objectpool.h>

#include <bdlmt_threadpool.h>

#include <bslma_allocator.h>
#include <bslma_usesbslmaallocator.h>

#include <bslmf_movableref.h>
#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_latch.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>
#include <bslmt_mutexassert.h>
#include <bslmt_readerwritermutex.h>
Expand All @@ -371,8 +373,6 @@ BSLS_IDENT("$Id: $")
#include <bsl_functional.h>
#include <bsl_map.h>

#include <bslmt_latch.h>

namespace BloombergLP {
namespace bdlmt {

Expand Down Expand Up @@ -515,12 +515,16 @@ class MultiQueueThreadPool_Queue {
int initiatePause();

/// Enqueue the specified `functor` at the end of this queue. Return 0
/// on success, and a non-zero value if enqueuing is disabled.
int pushBack(const Job& functor);
/// on success, and a non-zero value if enqueuing is disabled. The value
/// of `functor` becomes unspecified but valid, and its allocator remains
/// unchanged.
int pushBack(bslmf::MovableRef<Job> functor);

/// Add the specified `functor` at the front of this queue. Return 0 on
/// success, and a non-zero value if enqueuing is disabled.
int pushFront(const Job& functor);
/// success, and a non-zero value if enqueuing is disabled. The value of
/// `functor` becomes unspecified but valid, and its allocator remains
/// unchanged.
int pushFront(bslmf::MovableRef<Job> functor);

/// Reset this queue to its initial state. The behavior is undefined
/// unless this queue's lock is in an unlocked state. After this method
Expand Down Expand Up @@ -715,11 +719,13 @@ class MultiQueueThreadPool {

/// Add the specified `functor` at the front of the queue specified by
/// `id`. Return 0 if added successfully, and a non-zero value if
/// queuing is disabled. The behavior is undefined unless `functor` is
/// bound. Note that the position of `functor` relative to any
/// currently queued jobs is unspecified unless the queue is currently
/// paused.
int addJobAtFront(int id, const Job& functor);
/// queuing is disabled. If passed by movable reference, the value of
/// `functor` becomes unspecified but valid, and its allocator remains
/// unchanged. The behavior is undefined unless `functor` is bound. Note
/// that the position of `functor` relative to any currently queued jobs is
/// unspecified unless the queue is currently paused.
int addJobAtFront(int id, const Job& functor);
int addJobAtFront(int id, bslmf::MovableRef<Job> functor);

/// Create a queue with unlimited capacity and a default number of
/// initial elements. Return a non-zero queue ID. The queue ID can be
Expand Down Expand Up @@ -780,8 +786,11 @@ class MultiQueueThreadPool {

/// Enqueue the specified `functor` to the queue specified by `id`.
/// Return 0 if enqueued successfully, and a non-zero value if queuing
/// is disabled. The behavior is undefined unless `functor` is bound.
int enqueueJob(int id, const Job& functor);
/// is disabled. If passed by movable reference, the value of `functor`
/// becomes unspecified but valid, and its allocator remains unchanged.
/// The behavior is undefined unless `functor` is bound.
int enqueueJob(int id, const Job& functor);
int enqueueJob(int id, bslmf::MovableRef<Job> functor);

/// Enable enqueuing to the queue associated with the specified `id`.
/// Return 0 on success, and a non-zero value otherwise. It is an error
Expand Down Expand Up @@ -972,6 +981,14 @@ int MultiQueueThreadPool::findIfUsable(int id,
// MANIPULATORS
inline
int MultiQueueThreadPool::addJobAtFront(int id, const Job& functor)
{
Job temp(bsl::allocator_arg, d_allocator_p, functor);

return addJobAtFront(id, bslmf::MovableRefUtil::move(temp));
}

inline
int MultiQueueThreadPool::addJobAtFront(int id, bslmf::MovableRef<Job> functor)
{
bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

Expand All @@ -981,7 +998,7 @@ int MultiQueueThreadPool::addJobAtFront(int id, const Job& functor)
return 1; // RETURN
}

if (0 == queue->pushFront(functor)) {
if (0 == queue->pushFront(bslmf::MovableRefUtil::move(functor))) {
++d_numEnqueued;
return 0; // RETURN
}
Expand All @@ -991,6 +1008,14 @@ int MultiQueueThreadPool::addJobAtFront(int id, const Job& functor)

inline
int MultiQueueThreadPool::enqueueJob(int id, const Job& functor)
{
Job temp(bsl::allocator_arg, d_allocator_p, functor);

return enqueueJob(id, bslmf::MovableRefUtil::move(temp));
}

inline
int MultiQueueThreadPool::enqueueJob(int id, bslmf::MovableRef<Job> functor)
{
bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

Expand All @@ -1000,7 +1025,7 @@ int MultiQueueThreadPool::enqueueJob(int id, const Job& functor)
return 1; // RETURN
}

if (0 == queue->pushBack(functor)) {
if (0 == queue->pushBack(bslmf::MovableRefUtil::move(functor))) {
++d_numEnqueued;
return 0; // RETURN
}
Expand Down
Loading

0 comments on commit dc3c96d

Please sign in to comment.