proxygen
folly::IOThreadPoolExecutor Class Reference

#include <IOThreadPoolExecutor.h>

Inheritance diagram for folly::IOThreadPoolExecutor:
folly::ThreadPoolExecutor folly::IOExecutor folly::DefaultKeepAliveExecutor folly::Executor folly::Executor

Classes

struct  IOThread
 

Public Member Functions

 IOThreadPoolExecutor (size_t numThreads, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("IOThreadPool"), folly::EventBaseManager *ebm=folly::EventBaseManager::get(), bool waitForAll=false)
 
 ~IOThreadPoolExecutor () override
 
void add (Func func) override
 
void add (Func func, std::chrono::milliseconds expiration, Func expireCallback=nullptr) override
 
folly::EventBasegetEventBase () override
 
folly::EventBaseManagergetEventBaseManager ()
 
- Public Member Functions inherited from folly::ThreadPoolExecutor
 ThreadPoolExecutor (size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
 
 ~ThreadPoolExecutor () override
 
void setThreadFactory (std::shared_ptr< ThreadFactory > threadFactory)
 
std::shared_ptr< ThreadFactorygetThreadFactory ()
 
size_t numThreads ()
 
void setNumThreads (size_t numThreads)
 
size_t numActiveThreads ()
 
void stop ()
 
void join ()
 
PoolStats getPoolStats ()
 
size_t getPendingTaskCount ()
 
std::string getName ()
 
void subscribeToTaskStats (TaskStatsCallback cb)
 
void addObserver (std::shared_ptr< Observer >)
 
void removeObserver (std::shared_ptr< Observer >)
 
void setThreadDeathTimeout (std::chrono::milliseconds timeout)
 
- Public Member Functions inherited from folly::DefaultKeepAliveExecutor
 DefaultKeepAliveExecutor ()
 
virtual ~DefaultKeepAliveExecutor ()
 
folly::Executor::KeepAlive weakRef ()
 
- Public Member Functions inherited from folly::Executor
virtual ~Executor ()
 
virtual void addWithPriority (Func, int8_t priority)
 
virtual uint8_t getNumPriorities () const
 
- Public Member Functions inherited from folly::IOExecutor
 ~IOExecutor () override=default
 

Static Public Member Functions

static folly::EventBasegetEventBase (ThreadPoolExecutor::ThreadHandle *)
 
- Static Public Member Functions inherited from folly::ThreadPoolExecutor
static void withAll (FunctionRef< void(ThreadPoolExecutor &)> f)
 
- Static Public Member Functions inherited from folly::Executor
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT *executor)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > getKeepAliveToken (ExecutorT &executor)
 

Private Member Functions

ThreadPtr makeThread () override
 
std::shared_ptr< IOThreadpickThread ()
 
void threadRun (ThreadPtr thread) override
 
void stopThreads (size_t n) override
 
size_t getPendingTaskCountImpl () override
 

Private Attributes

std::atomic< size_t > nextThread_
 
folly::ThreadLocal< std::shared_ptr< IOThread > > thisThread_
 
folly::EventBaseManagereventBaseManager_
 

Additional Inherited Members

- Public Types inherited from folly::ThreadPoolExecutor
using TaskStatsCallback = std::function< void(TaskStats)>
 
- Static Public Attributes inherited from folly::Executor
static const int8_t LO_PRI = SCHAR_MIN
 
static const int8_t MID_PRI = 0
 
static const int8_t HI_PRI = SCHAR_MAX
 
- Protected Types inherited from folly::ThreadPoolExecutor
typedef std::shared_ptr< ThreadThreadPtr
 
- Protected Member Functions inherited from folly::ThreadPoolExecutor
void addThreads (size_t n)
 
void removeThreads (size_t n, bool isJoin)
 
void joinStoppedThreads (size_t n)
 
void ensureActiveThreads ()
 
void ensureJoined ()
 
bool minActive ()
 
bool tryTimeoutThread ()
 
void joinKeepAliveOnce ()
 
- Protected Member Functions inherited from folly::DefaultKeepAliveExecutor
void joinKeepAlive ()
 
- Static Protected Member Functions inherited from folly::ThreadPoolExecutor
static void runTask (const ThreadPtr &thread, Task &&task)
 
- Static Protected Member Functions inherited from folly::Executor
template<typename ExecutorT >
static bool isKeepAliveDummy (const KeepAlive< ExecutorT > &keepAlive)
 
template<typename ExecutorT >
static KeepAlive< ExecutorT > makeKeepAlive (ExecutorT *executor)
 
- Protected Attributes inherited from folly::ThreadPoolExecutor
std::shared_ptr< ThreadFactorythreadFactory_
 
const bool isWaitForAll_
 
ThreadList threadList_
 
SharedMutex threadListLock_
 
StoppedThreadQueue stoppedThreads_
 
std::atomic< bool > isJoin_ {false}
 
std::shared_ptr< TaskStatsCallbackRegistrytaskStatsCallbacks_
 
std::vector< std::shared_ptr< Observer > > observers_
 
folly::ThreadPoolListHook threadPoolHook_
 
std::atomic< size_t > maxThreads_ {0}
 
std::atomic< size_t > minThreads_ {0}
 
std::atomic< size_t > activeThreads_ {0}
 
std::atomic< size_t > threadsToJoin_ {0}
 
std::chrono::milliseconds threadTimeout_ {0}
 
bool keepAliveJoined_ {false}
 

Detailed Description

A Thread Pool for IO bound tasks

Note
Uses event_fd for notification, and waking an epoll loop. There is one queue (NotificationQueue specifically) per thread/epoll. If the thread is already running and not waiting on epoll, we don't make any additional syscalls to wake up the loop, just put the new task in the queue. If any thread has been waiting for more than a few seconds, its stack is madvised away. Currently however tasks are scheduled round robin on the queues, so unless there is no work going on, this isn't very effective. Since there is one queue per thread, there is hardly any contention on the queues - so a simple spinlock around an std::deque is used for the tasks. There is no max queue size. By default, there is one thread per core - it usually doesn't make sense to have more IO threads than this, assuming they don't block.
getEventBase() will return an EventBase you can schedule IO work on directly, chosen round-robin.
N.B. For this thread pool, stop() behaves like join() because outstanding tasks belong to the event base and will be executed upon its destruction.

Definition at line 52 of file IOThreadPoolExecutor.h.

Constructor & Destructor Documentation

folly::IOThreadPoolExecutor::IOThreadPoolExecutor ( size_t  numThreads,
std::shared_ptr< ThreadFactory threadFactory = std::make_shared<NamedThreadFactory>("IOThreadPool"),
folly::EventBaseManager ebm = folly::EventBaseManager::get(),
bool  waitForAll = false 
)
explicit

Definition at line 70 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::setNumThreads().

76  numThreads,
77  FLAGS_dynamic_iothreadpoolexecutor ? 0 : numThreads,
78  std::move(threadFactory),
79  waitForAll),
80  nextThread_(0),
81  eventBaseManager_(ebm) {
82  setNumThreads(numThreads);
83 }
std::atomic< size_t > nextThread_
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void setNumThreads(size_t numThreads)
folly::EventBaseManager * eventBaseManager_
folly::IOThreadPoolExecutor::~IOThreadPoolExecutor ( )
override

Definition at line 85 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::stop().

85  {
86  stop();
87 }

Member Function Documentation

void folly::IOThreadPoolExecutor::add ( Func  )
overridevirtual

Enqueue a function to executed by this executor. This and all variants must be threadsafe.

Implements folly::ThreadPoolExecutor.

Definition at line 89 of file IOThreadPoolExecutor.cpp.

References folly::gen::move.

Referenced by destroy< IOThreadPoolExecutor >(), and stop< IOThreadPoolExecutor >().

89  {
90  add(std::move(func), std::chrono::milliseconds(0));
91 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void add(Func func) override
void folly::IOThreadPoolExecutor::add ( Func  func,
std::chrono::milliseconds  expiration,
Func  expireCallback = nullptr 
)
overridevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 93 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ensureActiveThreads(), folly::ThreadPoolExecutor::ThreadList::get(), folly::gen::move, pickThread(), folly::ThreadPoolExecutor::runTask(), folly::ThreadPoolExecutor::threadList_, and folly::ThreadPoolExecutor::threadListLock_.

96  {
98  SharedMutex::ReadHolder r{&threadListLock_};
99  if (threadList_.get().empty()) {
100  throw std::runtime_error("No threads available");
101  }
102  auto ioThread = pickThread();
103 
104  auto task = Task(std::move(func), expiration, std::move(expireCallback));
105  auto wrappedFunc = [ioThread, task = std::move(task)]() mutable {
106  runTask(ioThread, std::move(task));
107  ioThread->pendingTasks--;
108  };
109 
110  ioThread->pendingTasks++;
111  if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
112  ioThread->pendingTasks--;
113  throw std::runtime_error("Unable to run func in event base thread");
114  }
115 }
static void runTask(const ThreadPtr &thread, Task &&task)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static void expiration()
const std::vector< ThreadPtr > & get() const
std::shared_ptr< IOThread > pickThread()
EventBase * folly::IOThreadPoolExecutor::getEventBase ( )
overridevirtual

Implements folly::IOExecutor.

Definition at line 144 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ensureActiveThreads(), pickThread(), and folly::ThreadPoolExecutor::threadListLock_.

Referenced by wangle::ServerWorkerPool::threadStarted().

144  {
146  SharedMutex::ReadHolder r{&threadListLock_};
147  return pickThread()->eventBase;
148 }
std::shared_ptr< IOThread > pickThread()
EventBase * folly::IOThreadPoolExecutor::getEventBase ( ThreadPoolExecutor::ThreadHandle h)
static

Definition at line 150 of file IOThreadPoolExecutor.cpp.

References folly::IOThreadPoolExecutor::IOThread::eventBase, and h.

151  {
152  auto thread = dynamic_cast<IOThread*>(h);
153 
154  if (thread) {
155  return thread->eventBase;
156  }
157 
158  return nullptr;
159 }
*than *hazptr_holder h
Definition: Hazptr.h:116
EventBaseManager * folly::IOThreadPoolExecutor::getEventBaseManager ( )

Definition at line 161 of file IOThreadPoolExecutor.cpp.

References eventBaseManager_.

161  {
162  return eventBaseManager_;
163 }
folly::EventBaseManager * eventBaseManager_
size_t folly::IOThreadPoolExecutor::getPendingTaskCountImpl ( )
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 225 of file IOThreadPoolExecutor.cpp.

References count, folly::ThreadPoolExecutor::ThreadList::get(), folly::IOThreadPoolExecutor::IOThread::pendingTasks, and folly::ThreadPoolExecutor::threadList_.

225  {
226  size_t count = 0;
227  for (const auto& thread : threadList_.get()) {
228  auto ioThread = std::static_pointer_cast<IOThread>(thread);
229  size_t pendingTasks = ioThread->pendingTasks;
230  if (pendingTasks > 0 && !ioThread->idle) {
231  pendingTasks--;
232  }
233  count += pendingTasks;
234  }
235  return count;
236 }
const std::vector< ThreadPtr > & get() const
int * count
std::shared_ptr< ThreadPoolExecutor::Thread > folly::IOThreadPoolExecutor::makeThread ( )
overrideprivatevirtual

Reimplemented from folly::ThreadPoolExecutor.

Definition at line 165 of file IOThreadPoolExecutor.cpp.

165  {
166  return std::make_shared<IOThread>(this);
167 }
std::shared_ptr< IOThreadPoolExecutor::IOThread > folly::IOThreadPoolExecutor::pickThread ( )
private

Definition at line 118 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::ThreadList::get(), nextThread_, thisThread_, and folly::ThreadPoolExecutor::threadList_.

Referenced by add(), and getEventBase().

118  {
119  auto& me = *thisThread_;
120  auto& ths = threadList_.get();
121  // When new task is added to IOThreadPoolExecutor, a thread is chosen for it
122  // to be executed on, thisThread_ is by default chosen, however, if the new
123  // task is added by the clean up operations on thread destruction, thisThread_
124  // is not an available thread anymore, thus, always check whether or not
125  // thisThread_ is an available thread before choosing it.
126  if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
127  return me;
128  }
129  auto n = ths.size();
130  if (n == 0) {
131  // XXX I think the only way this can happen is if somebody calls
132  // getEventBase (1) from one of the executor's threads while the executor
133  // is stopping or getting downsized to zero or (2) from outside the executor
134  // when it has no threads. In the first case, it's not obvious what the
135  // correct behavior should be-- do we really want to return ourselves even
136  // though we're about to exit? (The comment above seems to imply no.) In
137  // the second case, `!me` so we'll crash anyway.
138  return me;
139  }
140  auto thread = ths[nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
141  return std::static_pointer_cast<IOThread>(thread);
142 }
std::atomic< size_t > nextThread_
const std::vector< ThreadPtr > & get() const
folly::ThreadLocal< std::shared_ptr< IOThread > > thisThread_
void folly::IOThreadPoolExecutor::stopThreads ( size_t  n)
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 202 of file IOThreadPoolExecutor.cpp.

References folly::ThreadPoolExecutor::StoppedThreadQueue::add(), folly::ThreadPoolExecutor::ThreadList::get(), folly::gen::guard(), i, folly::ThreadPoolExecutor::observers_, folly::ThreadPoolExecutor::ThreadList::remove(), folly::IOThreadPoolExecutor::IOThread::shouldRun, folly::ThreadPoolExecutor::stoppedThreads_, and folly::ThreadPoolExecutor::threadList_.

202  {
203  std::vector<ThreadPtr> stoppedThreads;
204  stoppedThreads.reserve(n);
205  for (size_t i = 0; i < n; i++) {
206  const auto ioThread =
207  std::static_pointer_cast<IOThread>(threadList_.get()[i]);
208  for (auto& o : observers_) {
209  o->threadStopped(ioThread.get());
210  }
211  ioThread->shouldRun = false;
212  stoppedThreads.push_back(ioThread);
213  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
214  if (ioThread->eventBase) {
215  ioThread->eventBase->terminateLoopSoon();
216  }
217  }
218  for (auto thread : stoppedThreads) {
219  stoppedThreads_.add(thread);
220  threadList_.remove(thread);
221  }
222 }
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
void remove(const ThreadPtr &state)
BlockingQueueAddResult add(ThreadPtr item) override
StoppedThreadQueue stoppedThreads_
void folly::IOThreadPoolExecutor::threadRun ( ThreadPtr  thread)
overrideprivatevirtual

Implements folly::ThreadPoolExecutor.

Definition at line 169 of file IOThreadPoolExecutor.cpp.

References folly::EventBaseManager::clearEventBase(), folly::IOThreadPoolExecutor::IOThread::eventBase, eventBaseManager_, folly::EventBaseManager::getEventBase(), folly::gen::guard(), folly::ThreadPoolExecutor::isJoin_, folly::ThreadPoolExecutor::isWaitForAll_, folly::ThreadPoolListHook::registerThread(), thisThread_, and folly::ThreadPoolExecutor::threadPoolHook_.

169  {
171 
172  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
173  ioThread->eventBase = eventBaseManager_->getEventBase();
174  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
175 
176  auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
177  ioThread->eventBase->runBeforeLoop(idler.get());
178 
179  ioThread->eventBase->runInEventBaseThread(
180  [thread] { thread->startupBaton.post(); });
181  while (ioThread->shouldRun) {
182  ioThread->eventBase->loopForever();
183  }
184  if (isJoin_) {
185  while (ioThread->pendingTasks > 0) {
186  ioThread->eventBase->loopOnce();
187  }
188  }
189  idler.reset();
190  if (isWaitForAll_) {
191  // some tasks, like thrift asynchronous calls, create additional
192  // event base hookups, let's wait till all of them complete.
193  ioThread->eventBase->loop();
194  }
195 
196  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
197  ioThread->eventBase = nullptr;
199 }
EventBase * getEventBase() const
folly::ThreadPoolListHook threadPoolHook_
std::atomic< bool > isJoin_
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
folly::EventBaseManager * eventBaseManager_
folly::ThreadLocal< std::shared_ptr< IOThread > > thisThread_

Member Data Documentation

folly::EventBaseManager* folly::IOThreadPoolExecutor::eventBaseManager_
private

Definition at line 94 of file IOThreadPoolExecutor.h.

Referenced by getEventBaseManager(), and threadRun().

std::atomic<size_t> folly::IOThreadPoolExecutor::nextThread_
private

Definition at line 92 of file IOThreadPoolExecutor.h.

Referenced by pickThread().

folly::ThreadLocal<std::shared_ptr<IOThread> > folly::IOThreadPoolExecutor::thisThread_
private

Definition at line 93 of file IOThreadPoolExecutor.h.

Referenced by pickThread(), and threadRun().


The documentation for this class was generated from the following files: