proxygen
folly::CPUThreadPoolExecutor Class Reference

#include <CPUThreadPoolExecutor.h>

Inheritance diagram for folly::CPUThreadPoolExecutor:
folly::ThreadPoolExecutor folly::DefaultKeepAliveExecutor folly::Executor

Classes

struct  CPUTask
 

Public Member Functions

 CPUThreadPoolExecutor (size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
 
 CPUThreadPoolExecutor (std::pair< size_t, size_t > numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
 
 CPUThreadPoolExecutor (size_t numThreads)
 
 CPUThreadPoolExecutor (size_t numThreads, std::shared_ptr< ThreadFactory > threadFactory)
 
 CPUThreadPoolExecutor (std::pair< size_t, size_t > numThreads, std::shared_ptr< ThreadFactory > threadFactory)
 
 CPUThreadPoolExecutor (size_t numThreads, int8_t numPriorities, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
 
 CPUThreadPoolExecutor (size_t numThreads, int8_t numPriorities, size_t maxQueueSize, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
 
 ~CPUThreadPoolExecutor () override
 
void add (Func func) override
 
void add (Func func, std::chrono::milliseconds expiration, Func expireCallback=nullptr) override
 
void addWithPriority (Func func, int8_t priority) override
 
void add (Func func, int8_t priority, std::chrono::milliseconds expiration, Func expireCallback=nullptr)
 
size_t getTaskQueueSize () const
 
uint8_t getNumPriorities () const override
 
- Public Member Functions inherited from folly::ThreadPoolExecutor
 ThreadPoolExecutor (size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
 
 ~ThreadPoolExecutor () override
 
void setThreadFactory (std::shared_ptr< ThreadFactory > threadFactory)
 
std::shared_ptr< ThreadFactorygetThreadFactory ()
 
size_t numThreads ()
 
void setNumThreads (size_t numThreads)
 
size_t numActiveThreads ()
 
void stop ()
 
void join ()
 
PoolStats getPoolStats ()
 
size_t getPendingTaskCount ()
 
std::string getName ()
 
void subscribeToTaskStats (TaskStatsCallback cb)
 
void addObserver (std::shared_ptr< Observer >)
 
void removeObserver (std::shared_ptr< Observer >)
 
void setThreadDeathTimeout (std::chrono::milliseconds timeout)
 
- Public Member Functions inherited from folly::DefaultKeepAliveExecutor
 DefaultKeepAliveExecutor ()
 
virtual ~DefaultKeepAliveExecutor ()
 
folly::Executor::KeepAlive weakRef ()
 
- Public Member Functions inherited from folly::Executor
virtual ~Executor ()
 

Static Public Attributes

static const size_t kDefaultMaxQueueSize = 1 << 14
 
- Static Public Attributes inherited from folly::Executor
static const int8_t LO_PRI = SCHAR_MIN
 
static const int8_t MID_PRI = 0
 
static const int8_t HI_PRI = SCHAR_MAX
 

Protected Member Functions

BlockingQueue< CPUTask > * getTaskQueue ()
 
- Protected Member Functions inherited from folly::ThreadPoolExecutor
void addThreads (size_t n)
 
void removeThreads (size_t n, bool isJoin)
 
void joinStoppedThreads (size_t n)
 
virtual ThreadPtr makeThread ()
 
void ensureActiveThreads ()
 
void ensureJoined ()
 
bool minActive ()
 
bool tryTimeoutThread ()
 
void joinKeepAliveOnce ()
 
- Protected Member Functions inherited from folly::DefaultKeepAliveExecutor
void joinKeepAlive ()
 

Private Member Functions

void threadRun (ThreadPtr thread) override
 
void stopThreads (size_t n) override
 
size_t getPendingTaskCountImpl () override
 
bool tryDecrToStop ()
 
bool taskShouldStop (folly::Optional< CPUTask > &)
 

Private Attributes

std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
 
std::atomic< ssize_t > threadsToStop_ {0}
 

Additional Inherited Members

- Public Types inherited from folly::ThreadPoolExecutor
using TaskStatsCallback = std::function< void(TaskStats)>
 
- Static Public Member Functions inherited from folly::ThreadPoolExecutor
static void withAll (FunctionRef< void(ThreadPoolExecutor &)> f)
 
- Static Public Member Functions inherited from folly::Executor
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT *executor)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT &executor)
 
- Protected Types inherited from folly::ThreadPoolExecutor
typedef std::shared_ptr< ThreadThreadPtr
 
- Static Protected Member Functions inherited from folly::ThreadPoolExecutor
static void runTask (const ThreadPtr &thread, Task &&task)
 
- Static Protected Member Functions inherited from folly::Executor
template<typename ExecutorT >
static bool isKeepAliveDummy (const KeepAlive< ExecutorT > &keepAlive)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > makeKeepAlive (ExecutorT *executor)
 
- Protected Attributes inherited from folly::ThreadPoolExecutor
std::shared_ptr< ThreadFactorythreadFactory_
 
const bool isWaitForAll_
 
ThreadList threadList_
 
SharedMutex threadListLock_
 
StoppedThreadQueue stoppedThreads_
 
std::atomic< bool > isJoin_ {false}
 
std::shared_ptr< TaskStatsCallbackRegistrytaskStatsCallbacks_
 
std::vector< std::shared_ptr< Observer > > observers_
 
folly::ThreadPoolListHook threadPoolHook_
 
std::atomic< size_t > maxThreads_ {0}
 
std::atomic< size_t > minThreads_ {0}
 
std::atomic< size_t > activeThreads_ {0}
 
std::atomic< size_t > threadsToJoin_ {0}
 
std::chrono::milliseconds threadTimeout_ {0}
 
bool keepAliveJoined_ {false}
 

Detailed Description

A Thread pool for CPU bound tasks.

Note
A single queue backed by folly/LifoSem and folly/MPMC queue. Because of this contention can be quite high, since all the worker threads and all the producer threads hit the same queue. MPMC queue excels in this situation but dictates a max queue size.
The default queue throws when full (folly::QueueBehaviorIfFull::THROW), so add() can fail. Furthermore, join() can also fail if the queue is full, because it enqueues numThreads poison tasks to stop the threads. If join() is needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used instead, initializing the lowest priority's (LO_PRI) capacity to at least numThreads. Poisons use LO_PRI so if that priority is not used for any user task join() is guaranteed not to encounter a full queue.
If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and tasks executing on a given thread pool schedule more tasks, deadlock is possible if the queue becomes full. Deadlock is also possible if there is a circular dependency among multiple thread pools with blocking queues. To avoid this situation, use non-blocking queue(s), or schedule tasks only from threads not belonging to the given thread pool(s), or use folly::IOThreadPoolExecutor.
LifoSem wakes up threads in Lifo order - i.e. there are only few threads as necessary running, and we always try to reuse the same few threads for better cache locality. Inactive threads have their stack madvised away. This works quite well in combination with Lifosem - it almost doesn't matter if more threads than are necessary are specified at startup.
Supports priorities - priorities are implemented as multiple queues - each worker thread checks the highest priority queue first. Threads themselves don't have priorities set, so a series of long running low priority tasks could still hog all the threads. (at last check pthreads thread priorities didn't work very well).

Definition at line 63 of file CPUThreadPoolExecutor.h.

Constructor & Destructor Documentation

folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( size_t  numThreads,
std::unique_ptr< BlockingQueue< CPUTask >>  taskQueue,
std::shared_ptr< ThreadFactory threadFactory = std::make_shared<NamedThreadFactory>("CPUThreadPool") 
)

Definition at line 31 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::setNumThreads().

36  numThreads,
37  FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
38  std::move(threadFactory)),
39  taskQueue_(std::move(taskQueue)) {
40  setNumThreads(numThreads);
41 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void setNumThreads(size_t numThreads)
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( std::pair< size_t, size_t >  numThreads,
std::unique_ptr< BlockingQueue< CPUTask >>  taskQueue,
std::shared_ptr< ThreadFactory threadFactory = std::make_shared<NamedThreadFactory>("CPUThreadPool") 
)

Definition at line 43 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::setNumThreads().

48  numThreads.first,
49  numThreads.second,
50  std::move(threadFactory)),
51  taskQueue_(std::move(taskQueue)) {
53 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void setNumThreads(size_t numThreads)
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( size_t  numThreads)
explicit

Definition at line 73 of file CPUThreadPoolExecutor.cpp.

75  numThreads,
76  std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( size_t  numThreads,
std::shared_ptr< ThreadFactory threadFactory 
)

Definition at line 55 of file CPUThreadPoolExecutor.cpp.

59  numThreads,
60  std::make_unique<LifoSemMPMCQueue<CPUTask>>(
62  std::move(threadFactory)) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
static const size_t kDefaultMaxQueueSize
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( std::pair< size_t, size_t >  numThreads,
std::shared_ptr< ThreadFactory threadFactory 
)

Definition at line 64 of file CPUThreadPoolExecutor.cpp.

68  numThreads,
69  std::make_unique<LifoSemMPMCQueue<CPUTask>>(
71  std::move(threadFactory)) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
static const size_t kDefaultMaxQueueSize
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( size_t  numThreads,
int8_t  numPriorities,
std::shared_ptr< ThreadFactory threadFactory = std::make_shared<NamedThreadFactory>("CPUThreadPool") 
)

Definition at line 78 of file CPUThreadPoolExecutor.cpp.

83  numThreads,
84  std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
85  numPriorities,
87  std::move(threadFactory)) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
static const size_t kDefaultMaxQueueSize
folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor ( size_t  numThreads,
int8_t  numPriorities,
size_t  maxQueueSize,
std::shared_ptr< ThreadFactory threadFactory = std::make_shared<NamedThreadFactory>("CPUThreadPool") 
)

Definition at line 89 of file CPUThreadPoolExecutor.cpp.

95  numThreads,
96  std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
97  numPriorities,
98  maxQueueSize),
99  std::move(threadFactory)) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
folly::CPUThreadPoolExecutor::~CPUThreadPoolExecutor ( )
override

Definition at line 101 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::stop(), and threadsToStop_.

101  {
102  stop();
103  CHECK(threadsToStop_ == 0);
104 }
std::atomic< ssize_t > threadsToStop_

Member Function Documentation

void folly::CPUThreadPoolExecutor::add ( Func  )
overridevirtual

Enqueue a function to executed by this executor. This and all variants must be threadsafe.

Implements folly::ThreadPoolExecutor.

Definition at line 106 of file CPUThreadPoolExecutor.cpp.

References folly::gen::move.

Referenced by addWithPriority(), and TEST().

106  {
107  add(std::move(func), std::chrono::milliseconds(0));
108 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void folly::CPUThreadPoolExecutor::add ( Func  func,
std::chrono::milliseconds  expiration,
Func  expireCallback = nullptr 
)
overridevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 110 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ensureActiveThreads(), folly::gen::move, and taskQueue_.

113  {
114  auto result = taskQueue_->add(
115  CPUTask(std::move(func), expiration, std::move(expireCallback)));
116  if (!result.reusedThread) {
118  }
119 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static void expiration()
void folly::CPUThreadPoolExecutor::add ( Func  func,
int8_t  priority,
std::chrono::milliseconds  expiration,
Func  expireCallback = nullptr 
)

Definition at line 125 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ensureActiveThreads(), getNumPriorities(), folly::gen::move, and taskQueue_.

129  {
130  CHECK(getNumPriorities() > 0);
131  auto result = taskQueue_->addWithPriority(
132  CPUTask(std::move(func), expiration, std::move(expireCallback)),
133  priority);
134  if (!result.reusedThread) {
136  }
137 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
uint8_t getNumPriorities() const override
static void expiration()
void folly::CPUThreadPoolExecutor::addWithPriority ( Func  ,
int8_t  priority 
)
overridevirtual

Enqueue a function with a given priority, where 0 is the medium priority This is up to the implementation to enforce

Reimplemented from folly::Executor.

Definition at line 121 of file CPUThreadPoolExecutor.cpp.

References add(), and folly::gen::move.

121  {
122  add(std::move(func), priority, std::chrono::milliseconds(0));
123 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
uint8_t folly::CPUThreadPoolExecutor::getNumPriorities ( ) const
overridevirtual

Reimplemented from folly::Executor.

Definition at line 139 of file CPUThreadPoolExecutor.cpp.

References taskQueue_.

Referenced by add().

139  {
140  return taskQueue_->getNumPriorities();
141 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
size_t folly::CPUThreadPoolExecutor::getPendingTaskCountImpl ( )
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 218 of file CPUThreadPoolExecutor.cpp.

References taskQueue_.

218  {
219  return taskQueue_->size();
220 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
BlockingQueue< CPUThreadPoolExecutor::CPUTask > * folly::CPUThreadPoolExecutor::getTaskQueue ( )
protected

Definition at line 148 of file CPUThreadPoolExecutor.cpp.

References taskQueue_.

148  {
149  return taskQueue_.get();
150 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
size_t folly::CPUThreadPoolExecutor::getTaskQueueSize ( ) const

Definition at line 143 of file CPUThreadPoolExecutor.cpp.

References taskQueue_.

143  {
144  return taskQueue_->size();
145 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
void folly::CPUThreadPoolExecutor::stopThreads ( size_t  n)
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 210 of file CPUThreadPoolExecutor.cpp.

References i, folly::Executor::LO_PRI, taskQueue_, and threadsToStop_.

210  {
211  threadsToStop_ += n;
212  for (size_t i = 0; i < n; i++) {
213  taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
214  }
215 }
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
static const int8_t LO_PRI
Definition: Executor.h:48
std::atomic< ssize_t > threadsToStop_
bool folly::CPUThreadPoolExecutor::taskShouldStop ( folly::Optional< CPUTask > &  task)
private

Definition at line 162 of file CPUThreadPoolExecutor.cpp.

References tryDecrToStop(), and folly::ThreadPoolExecutor::tryTimeoutThread().

Referenced by threadRun().

162  {
163  if (tryDecrToStop()) {
164  return true;
165  }
166  if (task) {
167  return false;
168  } else {
169  return tryTimeoutThread();
170  }
171  return true;
172 }
void folly::CPUThreadPoolExecutor::threadRun ( ThreadPtr  thread)
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 174 of file CPUThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::StoppedThreadQueue::add(), folly::ThreadPoolExecutor::isJoin_, folly::gen::move, folly::ThreadPoolExecutor::observers_, folly::ThreadPoolListHook::registerThread(), folly::ThreadPoolExecutor::ThreadList::remove(), folly::ThreadPoolExecutor::runTask(), folly::ThreadPoolExecutor::stoppedThreads_, taskQueue_, taskShouldStop(), folly::ThreadPoolExecutor::threadList_, folly::ThreadPoolExecutor::threadListLock_, folly::ThreadPoolExecutor::threadPoolHook_, threadsToStop_, folly::ThreadPoolExecutor::threadTimeout_, tryDecrToStop(), and UNLIKELY.

174  {
176 
177  thread->startupBaton.post();
178  while (true) {
179  auto task = taskQueue_->try_take_for(threadTimeout_);
180  // Handle thread stopping, either by task timeout, or
181  // by 'poison' task added in join() or stop().
182  if (UNLIKELY(!task || task.value().poison)) {
183  // Actually remove the thread from the list.
184  SharedMutex::WriteHolder w{&threadListLock_};
185  if (taskShouldStop(task)) {
186  for (auto& o : observers_) {
187  o->threadStopped(thread.get());
188  }
189  threadList_.remove(thread);
190  stoppedThreads_.add(thread);
191  return;
192  } else {
193  continue;
194  }
195  }
196 
197  runTask(thread, std::move(task.value()));
198 
199  if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
200  SharedMutex::WriteHolder w{&threadListLock_};
201  if (tryDecrToStop()) {
202  threadList_.remove(thread);
203  stoppedThreads_.add(thread);
204  return;
205  }
206  }
207  }
208 }
static void runTask(const ThreadPtr &thread, Task &&task)
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
std::chrono::milliseconds threadTimeout_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::ThreadPoolListHook threadPoolHook_
std::atomic< bool > isJoin_
bool taskShouldStop(folly::Optional< CPUTask > &)
std::vector< std::shared_ptr< Observer > > observers_
void remove(const ThreadPtr &state)
BlockingQueueAddResult add(ThreadPtr item) override
StoppedThreadQueue stoppedThreads_
#define UNLIKELY(x)
Definition: Likely.h:48
std::atomic< ssize_t > threadsToStop_
bool folly::CPUThreadPoolExecutor::tryDecrToStop ( )
private

Definition at line 153 of file CPUThreadPoolExecutor.cpp.

References threadsToStop_.

Referenced by taskShouldStop(), and threadRun().

153  {
154  auto toStop = threadsToStop_.load(std::memory_order_relaxed);
155  if (toStop <= 0) {
156  return false;
157  }
158  threadsToStop_.store(toStop - 1, std::memory_order_relaxed);
159  return true;
160 }
std::atomic< ssize_t > threadsToStop_

Member Data Documentation

const size_t folly::CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14
static

Definition at line 136 of file CPUThreadPoolExecutor.h.

std::unique_ptr<BlockingQueue<CPUTask> > folly::CPUThreadPoolExecutor::taskQueue_
private
std::atomic<ssize_t> folly::CPUThreadPoolExecutor::threadsToStop_ {0}
private

The documentation for this class was generated from the following files: