proxygen
folly::ThreadPoolExecutor Class Referenceabstract

#include <ThreadPoolExecutor.h>

Inheritance diagram for folly::ThreadPoolExecutor:
folly::DefaultKeepAliveExecutor folly::Executor folly::CPUThreadPoolExecutor folly::IOThreadPoolExecutor

Classes

class  Observer
 
struct  PoolStats
 
class  StoppedThreadQueue
 
struct  Task
 
struct  TaskStats
 
struct  TaskStatsCallbackRegistry
 
struct  Thread
 
class  ThreadHandle
 
class  ThreadList
 

Public Types

using TaskStatsCallback = std::function< void(TaskStats)>
 

Public Member Functions

 ThreadPoolExecutor (size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
 
 ~ThreadPoolExecutor () override
 
void add (Func func) override=0
 
virtual void add (Func func, std::chrono::milliseconds expiration, Func expireCallback)=0
 
void setThreadFactory (std::shared_ptr< ThreadFactory > threadFactory)
 
std::shared_ptr< ThreadFactorygetThreadFactory ()
 
size_t numThreads ()
 
void setNumThreads (size_t numThreads)
 
size_t numActiveThreads ()
 
void stop ()
 
void join ()
 
PoolStats getPoolStats ()
 
size_t getPendingTaskCount ()
 
std::string getName ()
 
void subscribeToTaskStats (TaskStatsCallback cb)
 
void addObserver (std::shared_ptr< Observer >)
 
void removeObserver (std::shared_ptr< Observer >)
 
void setThreadDeathTimeout (std::chrono::milliseconds timeout)
 
- Public Member Functions inherited from folly::DefaultKeepAliveExecutor
 DefaultKeepAliveExecutor ()
 
virtual ~DefaultKeepAliveExecutor ()
 
folly::Executor::KeepAlive weakRef ()
 
- Public Member Functions inherited from folly::Executor
virtual ~Executor ()
 
virtual void addWithPriority (Func, int8_t priority)
 
virtual uint8_t getNumPriorities () const
 

Static Public Member Functions

static void withAll (FunctionRef< void(ThreadPoolExecutor &)> f)
 
- Static Public Member Functions inherited from folly::Executor
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT *executor)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT &executor)
 

Protected Types

typedef std::shared_ptr< ThreadThreadPtr
 

Protected Member Functions

void addThreads (size_t n)
 
void removeThreads (size_t n, bool isJoin)
 
virtual void threadRun (ThreadPtr thread)=0
 
virtual void stopThreads (size_t n)=0
 
void joinStoppedThreads (size_t n)
 
virtual ThreadPtr makeThread ()
 
virtual size_t getPendingTaskCountImpl ()=0
 
void ensureActiveThreads ()
 
void ensureJoined ()
 
bool minActive ()
 
bool tryTimeoutThread ()
 
void joinKeepAliveOnce ()
 
- Protected Member Functions inherited from folly::DefaultKeepAliveExecutor
void joinKeepAlive ()
 

Static Protected Member Functions

static void runTask (const ThreadPtr &thread, Task &&task)
 
- Static Protected Member Functions inherited from folly::Executor
template<typename ExecutorT >
static bool isKeepAliveDummy (const KeepAlive< ExecutorT > &keepAlive)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > makeKeepAlive (ExecutorT *executor)
 

Protected Attributes

std::shared_ptr< ThreadFactorythreadFactory_
 
const bool isWaitForAll_
 
ThreadList threadList_
 
SharedMutex threadListLock_
 
StoppedThreadQueue stoppedThreads_
 
std::atomic< bool > isJoin_ {false}
 
std::shared_ptr< TaskStatsCallbackRegistrytaskStatsCallbacks_
 
std::vector< std::shared_ptr< Observer > > observers_
 
folly::ThreadPoolListHook threadPoolHook_
 
std::atomic< size_t > maxThreads_ {0}
 
std::atomic< size_t > minThreads_ {0}
 
std::atomic< size_t > activeThreads_ {0}
 
std::atomic< size_t > threadsToJoin_ {0}
 
std::chrono::milliseconds threadTimeout_ {0}
 
bool keepAliveJoined_ {false}
 

Additional Inherited Members

- Static Public Attributes inherited from folly::Executor
static const int8_t LO_PRI = SCHAR_MIN
 
static const int8_t MID_PRI = 0
 
static const int8_t HI_PRI = SCHAR_MAX
 

Detailed Description

Definition at line 55 of file ThreadPoolExecutor.h.

Member Typedef Documentation

Definition at line 123 of file ThreadPoolExecutor.h.

typedef std::shared_ptr<Thread> folly::ThreadPoolExecutor::ThreadPtr
protected

Definition at line 189 of file ThreadPoolExecutor.h.

Constructor & Destructor Documentation

folly::ThreadPoolExecutor::ThreadPoolExecutor ( size_t  maxThreads,
size_t  minThreads,
std::shared_ptr< ThreadFactory threadFactory,
bool  isWaitForAll = false 
)
explicit

Definition at line 37 of file ThreadPoolExecutor.cpp.

References folly::getSyncVecThreadPoolExecutors().

42  : threadFactory_(std::move(threadFactory)),
43  isWaitForAll_(isWaitForAll),
44  taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
45  threadPoolHook_("folly::ThreadPoolExecutor"),
46  minThreads_(minThreads),
47  threadTimeout_(FLAGS_threadtimeout_ms) {
48  getSyncVecThreadPoolExecutors()->push_back(this);
49 }
std::chrono::milliseconds threadTimeout_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::ThreadPoolListHook threadPoolHook_
std::shared_ptr< ThreadFactory > threadFactory_
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks_
SyncVecThreadPoolExecutors & getSyncVecThreadPoolExecutors()
std::atomic< size_t > minThreads_
folly::ThreadPoolExecutor::~ThreadPoolExecutor ( )
override

Definition at line 51 of file ThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ThreadList::get(), folly::getSyncVecThreadPoolExecutors(), joinKeepAliveOnce(), and threadList_.

51  {
53  CHECK_EQ(0, threadList_.get().size());
54  getSyncVecThreadPoolExecutors().withWLock([this](auto& tpe) {
55  tpe.erase(std::remove(tpe.begin(), tpe.end(), this), tpe.end());
56  });
57 }
const std::vector< ThreadPtr > & get() const
SyncVecThreadPoolExecutors & getSyncVecThreadPoolExecutors()

Member Function Documentation

void folly::ThreadPoolExecutor::add ( Func  )
overridepure virtual

Enqueue a function to executed by this executor. This and all variants must be threadsafe.

Implements folly::Executor.

Implemented in folly::CPUThreadPoolExecutor, and folly::IOThreadPoolExecutor.

virtual void folly::ThreadPoolExecutor::add ( Func  func,
std::chrono::milliseconds  expiration,
Func  expireCallback 
)
pure virtual
void folly::ThreadPoolExecutor::addObserver ( std::shared_ptr< Observer o)

Definition at line 349 of file ThreadPoolExecutor.cpp.

References activeThreads_, ensureActiveThreads(), folly::ThreadPoolExecutor::ThreadList::get(), maxThreads_, observers_, threadList_, and threadListLock_.

Referenced by TEST(), and folly::ThreadPoolExecutor::Observer::threadNotYetStopped().

349  {
350  {
351  SharedMutex::WriteHolder r{&threadListLock_};
352  observers_.push_back(o);
353  for (auto& thread : threadList_.get()) {
354  o->threadPreviouslyStarted(thread.get());
355  }
356  }
357  while (activeThreads_.load(std::memory_order_relaxed) <
358  maxThreads_.load(std::memory_order_relaxed)) {
360  }
361 }
std::atomic< size_t > activeThreads_
std::atomic< size_t > maxThreads_
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
void folly::ThreadPoolExecutor::addThreads ( size_t  n)
protected

Definition at line 180 of file ThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ThreadList::add(), folly::netops::bind(), i, makeThread(), observers_, threadFactory_, threadList_, and threadRun().

Referenced by ensureActiveThreads(), setNumThreads(), and setThreadDeathTimeout().

180  {
181  std::vector<ThreadPtr> newThreads;
182  for (size_t i = 0; i < n; i++) {
183  newThreads.push_back(makeThread());
184  }
185  for (auto& thread : newThreads) {
186  // TODO need a notion of failing to create the thread
187  // and then handling for that case
188  thread->handle = threadFactory_->newThread(
189  std::bind(&ThreadPoolExecutor::threadRun, this, thread));
190  threadList_.add(thread);
191  }
192  for (auto& thread : newThreads) {
193  thread->startupBaton.wait();
194  }
195  for (auto& o : observers_) {
196  for (auto& thread : newThreads) {
197  o->threadStarted(thread.get());
198  }
199  }
200 }
virtual void threadRun(ThreadPtr thread)=0
void add(const ThreadPtr &state)
std::shared_ptr< ThreadFactory > threadFactory_
std::vector< std::shared_ptr< Observer > > observers_
virtual ThreadPtr makeThread()
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
void folly::ThreadPoolExecutor::ensureActiveThreads ( )
protected

Definition at line 434 of file ThreadPoolExecutor.cpp.

References activeThreads_, addThreads(), folly::asymmetricLightBarrier(), ensureJoined(), maxThreads_, and threadListLock_.

Referenced by folly::IOThreadPoolExecutor::add(), folly::CPUThreadPoolExecutor::add(), addObserver(), and folly::IOThreadPoolExecutor::getEventBase().

434  {
435  ensureJoined();
436 
437  // Matches barrier in tryTimeoutThread(). Ensure task added
438  // is seen before loading activeThreads_ below.
440 
441  // Fast path assuming we are already at max threads.
442  auto active = activeThreads_.load(std::memory_order_relaxed);
443  auto total = maxThreads_.load(std::memory_order_relaxed);
444 
445  if (active >= total) {
446  return;
447  }
448 
449  SharedMutex::WriteHolder w{&threadListLock_};
450  // Double check behind lock.
451  active = activeThreads_.load(std::memory_order_relaxed);
452  total = maxThreads_.load(std::memory_order_relaxed);
453  if (active >= total) {
454  return;
455  }
457  activeThreads_.store(active + 1, std::memory_order_relaxed);
458 }
std::atomic< size_t > activeThreads_
std::atomic< size_t > maxThreads_
FOLLY_ALWAYS_INLINE void asymmetricLightBarrier()
void folly::ThreadPoolExecutor::ensureJoined ( )
protected

Definition at line 380 of file ThreadPoolExecutor.cpp.

References joinStoppedThreads(), threadListLock_, and threadsToJoin_.

Referenced by ensureActiveThreads().

380  {
381  auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
382  if (tojoin) {
383  {
384  SharedMutex::WriteHolder w{&threadListLock_};
385  tojoin = threadsToJoin_.load(std::memory_order_relaxed);
386  threadsToJoin_.store(0, std::memory_order_relaxed);
387  }
388  joinStoppedThreads(tojoin);
389  }
390 }
std::atomic< size_t > threadsToJoin_
std::string folly::ThreadPoolExecutor::getName ( )

Definition at line 287 of file ThreadPoolExecutor.cpp.

References folly::demangle(), folly::ThreadPoolExecutor::Thread::nextId, threadFactory_, and folly::toStdString().

287  {
288  auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
289  if (ntf == nullptr) {
290  return folly::demangle(typeid(*this).name()).toStdString();
291  }
292 
293  return ntf->getNamePrefix();
294 }
std::shared_ptr< ThreadFactory > threadFactory_
std::string toStdString(const folly::fbstring &s)
Definition: String.h:41
fbstring demangle(const char *name)
Definition: Demangle.cpp:111
size_t folly::ThreadPoolExecutor::getPendingTaskCount ( )

Definition at line 282 of file ThreadPoolExecutor.cpp.

References getPendingTaskCountImpl(), and threadListLock_.

282  {
283  SharedMutex::ReadHolder r{&threadListLock_};
284  return getPendingTaskCountImpl();
285 }
virtual size_t getPendingTaskCountImpl()=0
virtual size_t folly::ThreadPoolExecutor::getPendingTaskCountImpl ( )
protectedpure virtual
ThreadPoolExecutor::PoolStats folly::ThreadPoolExecutor::getPoolStats ( )

Definition at line 257 of file ThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::PoolStats::activeThreadCount, activeThreads_, folly::ThreadPoolExecutor::ThreadList::get(), getPendingTaskCountImpl(), folly::ThreadPoolExecutor::PoolStats::idleThreadCount, max, folly::ThreadPoolExecutor::PoolStats::maxIdleTime, maxThreads_, now(), folly::ThreadPoolExecutor::PoolStats::pendingTaskCount, folly::ThreadPoolExecutor::PoolStats::threadCount, threadList_, threadListLock_, and folly::ThreadPoolExecutor::PoolStats::totalTaskCount.

Referenced by TEST().

257  {
258  const auto now = std::chrono::steady_clock::now();
259  SharedMutex::ReadHolder r{&threadListLock_};
260  ThreadPoolExecutor::PoolStats stats;
261  size_t activeTasks = 0;
262  size_t idleAlive = 0;
263  for (auto thread : threadList_.get()) {
264  if (thread->idle) {
265  const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
266  stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
267  idleAlive++;
268  } else {
269  activeTasks++;
270  }
271  }
272  stats.pendingTaskCount = getPendingTaskCountImpl();
273  stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
274 
275  stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
276  stats.activeThreadCount =
277  activeThreads_.load(std::memory_order_relaxed) - idleAlive;
278  stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
279  return stats;
280 }
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
LogLevel max
Definition: LogLevel.cpp:31
std::chrono::steady_clock::time_point now()
std::atomic< size_t > maxThreads_
const std::vector< ThreadPtr > & get() const
std::shared_ptr<ThreadFactory> folly::ThreadPoolExecutor::getThreadFactory ( )
inline

Definition at line 74 of file ThreadPoolExecutor.h.

References f, join(), numActiveThreads(), numThreads(), setNumThreads(), stop(), threadFactory_, and withAll().

74  {
75  return threadFactory_;
76  }
std::shared_ptr< ThreadFactory > threadFactory_
void folly::ThreadPoolExecutor::join ( )

Definition at line 232 of file ThreadPoolExecutor.cpp.

References activeThreads_, folly::ThreadPoolExecutor::ThreadList::get(), joinKeepAliveOnce(), joinStoppedThreads(), maxThreads_, removeThreads(), folly::ThreadPoolExecutor::StoppedThreadQueue::size(), stoppedThreads_, threadList_, threadListLock_, and threadsToJoin_.

Referenced by getThreadFactory(), and TEST().

232  {
234  size_t n = 0;
235  {
236  SharedMutex::WriteHolder w{&threadListLock_};
237  maxThreads_.store(0, std::memory_order_release);
238  activeThreads_.store(0, std::memory_order_release);
239  n = threadList_.get().size();
240  removeThreads(n, true);
241  n += threadsToJoin_.load(std::memory_order_relaxed);
242  threadsToJoin_.store(0, std::memory_order_relaxed);
243  }
245  CHECK_EQ(0, threadList_.get().size());
246  CHECK_EQ(0, stoppedThreads_.size());
247 }
std::atomic< size_t > activeThreads_
std::atomic< size_t > threadsToJoin_
std::atomic< size_t > maxThreads_
const std::vector< ThreadPtr > & get() const
StoppedThreadQueue stoppedThreads_
void removeThreads(size_t n, bool isJoin)
void folly::ThreadPoolExecutor::joinKeepAliveOnce ( )
inlineprotected

Definition at line 318 of file ThreadPoolExecutor.h.

References folly::exchange(), folly::DefaultKeepAliveExecutor::joinKeepAlive(), and keepAliveJoined_.

Referenced by join(), stop(), and ~ThreadPoolExecutor().

318  {
319  if (!std::exchange(keepAliveJoined_, true)) {
320  joinKeepAlive();
321  }
322  }
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
void folly::ThreadPoolExecutor::joinStoppedThreads ( size_t  n)
protected

Definition at line 208 of file ThreadPoolExecutor.cpp.

References i, stoppedThreads_, and folly::ThreadPoolExecutor::StoppedThreadQueue::take().

Referenced by ensureJoined(), join(), setNumThreads(), and stop().

208  {
209  for (size_t i = 0; i < n; i++) {
210  auto thread = stoppedThreads_.take();
211  thread->handle.join();
212  }
213 }
StoppedThreadQueue stoppedThreads_
virtual ThreadPtr folly::ThreadPoolExecutor::makeThread ( )
inlineprotectedvirtual

Reimplemented in folly::IOThreadPoolExecutor.

Definition at line 222 of file ThreadPoolExecutor.h.

References getPendingTaskCountImpl().

Referenced by addThreads().

222  {
223  return std::make_shared<Thread>(this);
224  }
bool folly::ThreadPoolExecutor::minActive ( )
protected

Definition at line 462 of file ThreadPoolExecutor.cpp.

References activeThreads_, and minThreads_.

Referenced by tryTimeoutThread().

462  {
463  return activeThreads_.load(std::memory_order_relaxed) >
464  minThreads_.load(std::memory_order_relaxed);
465 }
std::atomic< size_t > activeThreads_
std::atomic< size_t > minThreads_
size_t folly::ThreadPoolExecutor::numActiveThreads ( )

Definition at line 120 of file ThreadPoolExecutor.cpp.

References activeThreads_.

Referenced by getThreadFactory().

120  {
121  return activeThreads_.load(std::memory_order_relaxed);
122 }
std::atomic< size_t > activeThreads_
size_t folly::ThreadPoolExecutor::numThreads ( )

Definition at line 116 of file ThreadPoolExecutor.cpp.

References maxThreads_.

Referenced by getThreadFactory(), setNumThreads(), and setThreadFactory().

116  {
117  return maxThreads_.load(std::memory_order_relaxed);
118 }
std::atomic< size_t > maxThreads_
void folly::ThreadPoolExecutor::removeObserver ( std::shared_ptr< Observer o)

Definition at line 363 of file ThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ThreadList::get(), observers_, threadList_, and threadListLock_.

Referenced by TEST(), and folly::ThreadPoolExecutor::Observer::threadNotYetStopped().

363  {
364  SharedMutex::WriteHolder r{&threadListLock_};
365  for (auto& thread : threadList_.get()) {
366  o->threadNotYetStopped(thread.get());
367  }
368 
369  for (auto it = observers_.begin(); it != observers_.end(); it++) {
370  if (*it == o) {
371  observers_.erase(it);
372  return;
373  }
374  }
375  DCHECK(false);
376 }
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
void folly::ThreadPoolExecutor::removeThreads ( size_t  n,
bool  isJoin 
)
protected

Definition at line 203 of file ThreadPoolExecutor.cpp.

References isJoin_, and stopThreads().

Referenced by join(), setNumThreads(), setThreadDeathTimeout(), and stop().

203  {
204  isJoin_ = isJoin;
205  stopThreads(n);
206 }
virtual void stopThreads(size_t n)=0
std::atomic< bool > isJoin_
void folly::ThreadPoolExecutor::runTask ( const ThreadPtr thread,
Task &&  task 
)
staticprotected

Definition at line 71 of file ThreadPoolExecutor.cpp.

References name, now(), and SCOPE_EXIT.

Referenced by folly::IOThreadPoolExecutor::add(), and folly::CPUThreadPoolExecutor::threadRun().

71  {
72  thread->idle = false;
73  auto startTime = std::chrono::steady_clock::now();
74  task.stats_.waitTime = startTime - task.enqueueTime_;
75  if (task.expiration_ > std::chrono::milliseconds(0) &&
76  task.stats_.waitTime >= task.expiration_) {
77  task.stats_.expired = true;
78  if (task.expireCallback_ != nullptr) {
79  task.expireCallback_();
80  }
81  } else {
82  folly::RequestContextScopeGuard rctx(task.context_);
83  try {
84  task.func_();
85  } catch (const std::exception& e) {
86  LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled "
87  << typeid(e).name() << " exception: " << e.what();
88  } catch (...) {
89  LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
90  "object";
91  }
92  task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
93  }
94  thread->idle = true;
95  thread->lastActiveTime = std::chrono::steady_clock::now();
96  thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
97  *thread->taskStatsCallbacks->inCallback = true;
98  SCOPE_EXIT {
99  *thread->taskStatsCallbacks->inCallback = false;
100  };
101  try {
102  for (auto& callback : callbacks) {
103  callback(task.stats_);
104  }
105  } catch (const std::exception& e) {
106  LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
107  "unhandled "
108  << typeid(e).name() << " exception: " << e.what();
109  } catch (...) {
110  LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
111  "unhandled non-exception object";
112  }
113  });
114 }
std::chrono::steady_clock::time_point now()
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
const char * name
Definition: http_parser.c:437
void folly::ThreadPoolExecutor::setNumThreads ( size_t  numThreads)

Definition at line 125 of file ThreadPoolExecutor.cpp.

References activeThreads_, addThreads(), getPendingTaskCountImpl(), joinStoppedThreads(), maxThreads_, min, minThreads_, numThreads(), observers_, removeThreads(), and threadListLock_.

Referenced by folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor(), getThreadFactory(), folly::IOThreadPoolExecutor::IOThreadPoolExecutor(), and TEST().

125  {
126  /* Since ThreadPoolExecutor may be dynamically adjusting the number of
127  threads, we adjust the relevant variables instead of changing
128  the number of threads directly. Roughly:
129 
130  If numThreads < minthreads reset minThreads to numThreads.
131 
132  If numThreads < active threads, reduce number of running threads.
133 
134  If the number of pending tasks is > 0, then increase the currently
135  active number of threads such that we can run all the tasks, or reach
136  numThreads.
137 
138  Note that if there are observers, we actually have to create all
139  the threads, because some observer implementations need to 'observe'
140  all thread creation (see tests for an example of this)
141  */
142 
143  size_t numThreadsToJoin = 0;
144  {
145  SharedMutex::WriteHolder w{&threadListLock_};
146  auto pending = getPendingTaskCountImpl();
147  maxThreads_.store(numThreads, std::memory_order_relaxed);
148  auto active = activeThreads_.load(std::memory_order_relaxed);
149  auto minthreads = minThreads_.load(std::memory_order_relaxed);
150  if (numThreads < minthreads) {
151  minthreads = numThreads;
152  minThreads_.store(numThreads, std::memory_order_relaxed);
153  }
154  if (active > numThreads) {
155  numThreadsToJoin = active - numThreads;
156  if (numThreadsToJoin > active - minthreads) {
157  numThreadsToJoin = active - minthreads;
158  }
159  ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
160  activeThreads_.store(
161  active - numThreadsToJoin, std::memory_order_relaxed);
162  } else if (pending > 0 || observers_.size() > 0 || active < minthreads) {
163  size_t numToAdd = std::min(pending, numThreads - active);
164  if (observers_.size() > 0) {
165  numToAdd = numThreads - active;
166  }
167  if (active + numToAdd < minthreads) {
168  numToAdd = minthreads - active;
169  }
171  activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
172  }
173  }
174 
175  /* We may have removed some threads, attempt to join them */
176  joinStoppedThreads(numThreadsToJoin);
177 }
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
std::atomic< size_t > maxThreads_
std::vector< std::shared_ptr< Observer > > observers_
LogLevel min
Definition: LogLevel.cpp:30
void removeThreads(size_t n, bool isJoin)
std::atomic< size_t > minThreads_
void folly::ThreadPoolExecutor::setThreadDeathTimeout ( std::chrono::milliseconds  timeout)
inline

Definition at line 157 of file ThreadPoolExecutor.h.

References addThreads(), removeThreads(), and threadTimeout_.

Referenced by TEST().

157  {
159  }
std::chrono::milliseconds threadTimeout_
void folly::ThreadPoolExecutor::setThreadFactory ( std::shared_ptr< ThreadFactory threadFactory)
inline

Definition at line 69 of file ThreadPoolExecutor.h.

References folly::gen::move, numThreads(), and threadFactory_.

69  {
70  CHECK(numThreads() == 0);
71  threadFactory_ = std::move(threadFactory);
72  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::shared_ptr< ThreadFactory > threadFactory_
void folly::ThreadPoolExecutor::stop ( )

Definition at line 215 of file ThreadPoolExecutor.cpp.

References activeThreads_, folly::ThreadPoolExecutor::ThreadList::get(), joinKeepAliveOnce(), joinStoppedThreads(), maxThreads_, removeThreads(), folly::ThreadPoolExecutor::StoppedThreadQueue::size(), stoppedThreads_, threadList_, threadListLock_, and threadsToJoin_.

Referenced by getThreadFactory(), stop< IOThreadPoolExecutor >(), folly::CPUThreadPoolExecutor::~CPUThreadPoolExecutor(), and folly::IOThreadPoolExecutor::~IOThreadPoolExecutor().

215  {
217  size_t n = 0;
218  {
219  SharedMutex::WriteHolder w{&threadListLock_};
220  maxThreads_.store(0, std::memory_order_release);
221  activeThreads_.store(0, std::memory_order_release);
222  n = threadList_.get().size();
223  removeThreads(n, false);
224  n += threadsToJoin_.load(std::memory_order_relaxed);
225  threadsToJoin_.store(0, std::memory_order_relaxed);
226  }
228  CHECK_EQ(0, threadList_.get().size());
229  CHECK_EQ(0, stoppedThreads_.size());
230 }
std::atomic< size_t > activeThreads_
std::atomic< size_t > threadsToJoin_
std::atomic< size_t > maxThreads_
const std::vector< ThreadPtr > & get() const
StoppedThreadQueue stoppedThreads_
void removeThreads(size_t n, bool isJoin)
virtual void folly::ThreadPoolExecutor::stopThreads ( size_t  n)
protectedpure virtual
void folly::ThreadPoolExecutor::subscribeToTaskStats ( TaskStatsCallback  cb)

Definition at line 298 of file ThreadPoolExecutor.cpp.

References folly::gen::move, and taskStatsCallbacks_.

298  {
299  if (*taskStatsCallbacks_->inCallback) {
300  throw std::runtime_error("cannot subscribe in task stats callback");
301  }
302  taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
303 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks_
virtual void folly::ThreadPoolExecutor::threadRun ( ThreadPtr  thread)
protectedpure virtual
bool folly::ThreadPoolExecutor::tryTimeoutThread ( )
protected

Definition at line 393 of file ThreadPoolExecutor.cpp.

References activeThreads_, folly::asymmetricHeavyBarrier(), getPendingTaskCountImpl(), minActive(), and threadsToJoin_.

Referenced by folly::CPUThreadPoolExecutor::taskShouldStop().

393  {
394  // Try to stop based on idle thread timeout (try_take_for),
395  // if there are at least minThreads running.
396  if (!minActive()) {
397  return false;
398  }
399 
400  // Remove thread from active count
401  activeThreads_.store(
402  activeThreads_.load(std::memory_order_relaxed) - 1,
403  std::memory_order_relaxed);
404 
405  // There is a memory ordering constraint w.r.t the queue
406  // implementation's add() and getPendingTaskCountImpl() - while many
407  // queues have seq_cst ordering, some do not, so add an explicit
408  // barrier. tryTimeoutThread is the slow path and only happens once
409  // every thread timeout; use asymmetric barrier to keep add() fast.
411 
412  // If this is based on idle thread timeout, then
413  // adjust vars appropriately (otherwise stop() or join()
414  // does this).
415  if (getPendingTaskCountImpl() > 0) {
416  // There are still pending tasks, we can't stop yet.
417  // re-up active threads and return.
418  activeThreads_.store(
419  activeThreads_.load(std::memory_order_relaxed) + 1,
420  std::memory_order_relaxed);
421  return false;
422  }
423 
424  threadsToJoin_.store(
425  threadsToJoin_.load(std::memory_order_relaxed) + 1,
426  std::memory_order_relaxed);
427 
428  return true;
429 }
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
std::atomic< size_t > threadsToJoin_
void asymmetricHeavyBarrier(AMBFlags flags)
void folly::ThreadPoolExecutor::withAll ( FunctionRef< void(ThreadPoolExecutor &)>  f)
static

Execute f against all ThreadPoolExecutors, primarily for retrieving and exporting stats.

Definition at line 249 of file ThreadPoolExecutor.cpp.

References f, and folly::getSyncVecThreadPoolExecutors().

Referenced by getNumThreadPoolExecutors(), and getThreadFactory().

249  {
250  getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
251  for (auto tpe : tpes) {
252  f(*tpe);
253  }
254  });
255 }
auto f
SyncVecThreadPoolExecutors & getSyncVecThreadPoolExecutors()

Member Data Documentation

std::atomic<size_t> folly::ThreadPoolExecutor::activeThreads_ {0}
protected
std::atomic<bool> folly::ThreadPoolExecutor::isJoin_ {false}
protected
const bool folly::ThreadPoolExecutor::isWaitForAll_
protected

Definition at line 288 of file ThreadPoolExecutor.h.

Referenced by folly::IOThreadPoolExecutor::threadRun().

bool folly::ThreadPoolExecutor::keepAliveJoined_ {false}
protected

Definition at line 324 of file ThreadPoolExecutor.h.

Referenced by joinKeepAliveOnce().

std::atomic<size_t> folly::ThreadPoolExecutor::maxThreads_ {0}
protected
std::atomic<size_t> folly::ThreadPoolExecutor::minThreads_ {0}
protected

Definition at line 312 of file ThreadPoolExecutor.h.

Referenced by minActive(), and setNumThreads().

std::vector<std::shared_ptr<Observer> > folly::ThreadPoolExecutor::observers_
protected
StoppedThreadQueue folly::ThreadPoolExecutor::stoppedThreads_
protected
std::shared_ptr<TaskStatsCallbackRegistry> folly::ThreadPoolExecutor::taskStatsCallbacks_
protected

Definition at line 299 of file ThreadPoolExecutor.h.

Referenced by subscribeToTaskStats().

std::shared_ptr<ThreadFactory> folly::ThreadPoolExecutor::threadFactory_
protected

Definition at line 287 of file ThreadPoolExecutor.h.

Referenced by addThreads(), getName(), getThreadFactory(), and setThreadFactory().

folly::ThreadPoolListHook folly::ThreadPoolExecutor::threadPoolHook_
protected
std::atomic<size_t> folly::ThreadPoolExecutor::threadsToJoin_ {0}
protected

Definition at line 315 of file ThreadPoolExecutor.h.

Referenced by ensureJoined(), join(), stop(), and tryTimeoutThread().

std::chrono::milliseconds folly::ThreadPoolExecutor::threadTimeout_ {0}
protected

The documentation for this class was generated from the following files: