19 #include <glog/logging.h> 25 dynamic_iothreadpoolexecutor,
27 "IOThreadPoolExecutor will dynamically create threads");
51 std::chrono::steady_clock::duration idleTimeout =
57 std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
72 std::shared_ptr<ThreadFactory> threadFactory,
77 FLAGS_dynamic_iothreadpoolexecutor ? 0 : numThreads,
81 eventBaseManager_(ebm) {
96 Func expireCallback) {
100 throw std::runtime_error(
"No threads available");
105 auto wrappedFunc = [ioThread, task =
std::move(task)]()
mutable {
107 ioThread->pendingTasks--;
110 ioThread->pendingTasks++;
111 if (!ioThread->eventBase->runInEventBaseThread(
std::move(wrappedFunc))) {
112 ioThread->pendingTasks--;
113 throw std::runtime_error(
"Unable to run func in event base thread");
117 std::shared_ptr<IOThreadPoolExecutor::IOThread>
126 if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
140 auto thread = ths[
nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
141 return std::static_pointer_cast<
IOThread>(thread);
152 auto thread =
dynamic_cast<IOThread*
>(
h);
166 return std::make_shared<IOThread>(
this);
172 const auto ioThread = std::static_pointer_cast<
IOThread>(thread);
174 thisThread_.reset(
new std::shared_ptr<IOThread>(ioThread));
176 auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
177 ioThread->eventBase->runBeforeLoop(idler.get());
179 ioThread->eventBase->runInEventBaseThread(
180 [thread] { thread->startupBaton.post(); });
181 while (ioThread->shouldRun) {
182 ioThread->eventBase->loopForever();
185 while (ioThread->pendingTasks > 0) {
186 ioThread->eventBase->loopOnce();
193 ioThread->eventBase->loop();
196 std::lock_guard<std::mutex>
guard(ioThread->eventBaseShutdownMutex_);
197 ioThread->eventBase =
nullptr;
203 std::vector<ThreadPtr> stoppedThreads;
204 stoppedThreads.reserve(n);
205 for (
size_t i = 0;
i < n;
i++) {
206 const auto ioThread =
209 o->threadStopped(ioThread.get());
212 stoppedThreads.push_back(ioThread);
213 std::lock_guard<std::mutex>
guard(ioThread->eventBaseShutdownMutex_);
214 if (ioThread->eventBase) {
215 ioThread->eventBase->terminateLoopSoon();
218 for (
auto thread : stoppedThreads) {
228 auto ioThread = std::static_pointer_cast<
IOThread>(thread);
230 if (pendingTasks > 0 && !ioThread->idle) {
233 count += pendingTasks;
static void runTask(const ThreadPtr &thread, Task &&task)
DEFINE_bool(dynamic_iothreadpoolexecutor, true,"IOThreadPoolExecutor will dynamically create threads")
void runLoopCallback() noexceptoverride
std::atomic< size_t > nextThread_
EventBase * getEventBase() const
SharedMutex threadListLock_
ThreadPtr makeThread() override
static void unmapUnusedStack(size_t retain=kDefaultStackToRetain)
static IdleTime getVariationTimeout(IdleTime const &idleTimeout=defaultIdleTimeout.load(std::memory_order_acquire), float timeoutVariationFrac=0.5)
constexpr detail::Map< Move > move
folly::EventBaseManager * getEventBaseManager()
folly::ThreadPoolListHook threadPoolHook_
T load(std::memory_order mo=std::memory_order_seq_cst) const noexcept
—— Concurrent Priority Queue Implementation ——
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
requires E e noexcept(noexcept(s.error(std::move(e))))
size_t getPendingTaskCountImpl() override
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
void threadRun(ThreadPtr thread) override
std::atomic< size_t > pendingTasks
void add(Func func) override
std::atomic< bool > shouldRun
GuardImpl guard(ErrorHandler &&handler)
void remove(const ThreadPtr &state)
void timeoutExpired() noexceptoverride
void setNumThreads(size_t numThreads)
void ensureActiveThreads()
folly::EventBaseManager * eventBaseManager_
~IOThreadPoolExecutor() override
folly::EventBase * getEventBase() override
BlockingQueueAddResult add(ThreadPtr item) override
static AtomicStruct< std::chrono::steady_clock::duration > defaultIdleTimeout
StoppedThreadQueue stoppedThreads_
bool scheduleTimeout(uint32_t milliseconds)
MemoryIdlerTimeout(EventBase *b)
folly::ThreadLocal< std::shared_ptr< IOThread > > thisThread_
IOThreadPoolExecutor(size_t numThreads, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("IOThreadPool"), folly::EventBaseManager *ebm=folly::EventBaseManager::get(), bool waitForAll=false)
void runBeforeLoop(LoopCallback *callback)
void stopThreads(size_t n) override
folly::EventBase * eventBase
static void flushLocalMallocCaches()
std::shared_ptr< IOThread > pickThread()