proxygen
IOThreadPoolExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <glog/logging.h>
20 
23 
25  dynamic_iothreadpoolexecutor,
26  true,
27  "IOThreadPoolExecutor will dynamically create threads");
28 
29 namespace folly {
30 
32 
33 /* Class that will free jemalloc caches and madvise the stack away
34  * if the event loop is unused for some period of time
35  */
37  public:
39 
40  void timeoutExpired() noexcept override {
41  idled = true;
42  }
43 
44  void runLoopCallback() noexcept override {
45  if (idled) {
48 
49  idled = false;
50  } else {
51  std::chrono::steady_clock::duration idleTimeout =
52  MemoryIdler::defaultIdleTimeout.load(std::memory_order_acquire);
53 
54  idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
55 
56  scheduleTimeout(static_cast<uint32_t>(
57  std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
58  .count()));
59  }
60 
61  // reschedule this callback for the next event loop.
62  base_->runBeforeLoop(this);
63  }
64 
65  private:
67  bool idled{false};
68 };
69 
71  size_t numThreads,
72  std::shared_ptr<ThreadFactory> threadFactory,
73  EventBaseManager* ebm,
74  bool waitForAll)
76  numThreads,
77  FLAGS_dynamic_iothreadpoolexecutor ? 0 : numThreads,
78  std::move(threadFactory),
79  waitForAll),
80  nextThread_(0),
81  eventBaseManager_(ebm) {
82  setNumThreads(numThreads);
83 }
84 
86  stop();
87 }
88 
90  add(std::move(func), std::chrono::milliseconds(0));
91 }
92 
94  Func func,
95  std::chrono::milliseconds expiration,
96  Func expireCallback) {
99  if (threadList_.get().empty()) {
100  throw std::runtime_error("No threads available");
101  }
102  auto ioThread = pickThread();
103 
104  auto task = Task(std::move(func), expiration, std::move(expireCallback));
105  auto wrappedFunc = [ioThread, task = std::move(task)]() mutable {
106  runTask(ioThread, std::move(task));
107  ioThread->pendingTasks--;
108  };
109 
110  ioThread->pendingTasks++;
111  if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
112  ioThread->pendingTasks--;
113  throw std::runtime_error("Unable to run func in event base thread");
114  }
115 }
116 
117 std::shared_ptr<IOThreadPoolExecutor::IOThread>
119  auto& me = *thisThread_;
120  auto& ths = threadList_.get();
121  // When new task is added to IOThreadPoolExecutor, a thread is chosen for it
122  // to be executed on, thisThread_ is by default chosen, however, if the new
123  // task is added by the clean up operations on thread destruction, thisThread_
124  // is not an available thread anymore, thus, always check whether or not
125  // thisThread_ is an available thread before choosing it.
126  if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
127  return me;
128  }
129  auto n = ths.size();
130  if (n == 0) {
131  // XXX I think the only way this can happen is if somebody calls
132  // getEventBase (1) from one of the executor's threads while the executor
133  // is stopping or getting downsized to zero or (2) from outside the executor
134  // when it has no threads. In the first case, it's not obvious what the
135  // correct behavior should be-- do we really want to return ourselves even
136  // though we're about to exit? (The comment above seems to imply no.) In
137  // the second case, `!me` so we'll crash anyway.
138  return me;
139  }
140  auto thread = ths[nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
141  return std::static_pointer_cast<IOThread>(thread);
142 }
143 
147  return pickThread()->eventBase;
148 }
149 
152  auto thread = dynamic_cast<IOThread*>(h);
153 
154  if (thread) {
155  return thread->eventBase;
156  }
157 
158  return nullptr;
159 }
160 
162  return eventBaseManager_;
163 }
164 
165 std::shared_ptr<ThreadPoolExecutor::Thread> IOThreadPoolExecutor::makeThread() {
166  return std::make_shared<IOThread>(this);
167 }
168 
171 
172  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
174  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
175 
176  auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
177  ioThread->eventBase->runBeforeLoop(idler.get());
178 
179  ioThread->eventBase->runInEventBaseThread(
180  [thread] { thread->startupBaton.post(); });
181  while (ioThread->shouldRun) {
182  ioThread->eventBase->loopForever();
183  }
184  if (isJoin_) {
185  while (ioThread->pendingTasks > 0) {
186  ioThread->eventBase->loopOnce();
187  }
188  }
189  idler.reset();
190  if (isWaitForAll_) {
191  // some tasks, like thrift asynchronous calls, create additional
192  // event base hookups, let's wait till all of them complete.
193  ioThread->eventBase->loop();
194  }
195 
196  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
197  ioThread->eventBase = nullptr;
199 }
200 
201 // threadListLock_ is writelocked
203  std::vector<ThreadPtr> stoppedThreads;
204  stoppedThreads.reserve(n);
205  for (size_t i = 0; i < n; i++) {
206  const auto ioThread =
207  std::static_pointer_cast<IOThread>(threadList_.get()[i]);
208  for (auto& o : observers_) {
209  o->threadStopped(ioThread.get());
210  }
211  ioThread->shouldRun = false;
212  stoppedThreads.push_back(ioThread);
213  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
214  if (ioThread->eventBase) {
215  ioThread->eventBase->terminateLoopSoon();
216  }
217  }
218  for (auto thread : stoppedThreads) {
219  stoppedThreads_.add(thread);
220  threadList_.remove(thread);
221  }
222 }
223 
224 // threadListLock_ is readlocked
226  size_t count = 0;
227  for (const auto& thread : threadList_.get()) {
228  auto ioThread = std::static_pointer_cast<IOThread>(thread);
229  size_t pendingTasks = ioThread->pendingTasks;
230  if (pendingTasks > 0 && !ioThread->idle) {
231  pendingTasks--;
232  }
233  count += pendingTasks;
234  }
235  return count;
236 }
237 
238 } // namespace folly
static void runTask(const ThreadPtr &thread, Task &&task)
DEFINE_bool(dynamic_iothreadpoolexecutor, true,"IOThreadPoolExecutor will dynamically create threads")
void runLoopCallback() noexceptoverride
std::atomic< size_t > nextThread_
*than *hazptr_holder h
Definition: Hazptr.h:116
EventBase * getEventBase() const
static void unmapUnusedStack(size_t retain=kDefaultStackToRetain)
char b
static IdleTime getVariationTimeout(IdleTime const &idleTimeout=defaultIdleTimeout.load(std::memory_order_acquire), float timeoutVariationFrac=0.5)
Definition: MemoryIdler.h:70
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::EventBaseManager * getEventBaseManager()
folly::ThreadPoolListHook threadPoolHook_
STL namespace.
T load(std::memory_order mo=std::memory_order_seq_cst) const noexcept
Definition: AtomicStruct.h:141
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
requires E e noexcept(noexcept(s.error(std::move(e))))
static void expiration()
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
void threadRun(ThreadPtr thread) override
void add(Func func) override
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
void remove(const ThreadPtr &state)
void timeoutExpired() noexceptoverride
void setNumThreads(size_t numThreads)
folly::EventBaseManager * eventBaseManager_
int * count
folly::EventBase * getEventBase() override
BlockingQueueAddResult add(ThreadPtr item) override
static AtomicStruct< std::chrono::steady_clock::duration > defaultIdleTimeout
Definition: MemoryIdler.h:64
StoppedThreadQueue stoppedThreads_
bool scheduleTimeout(uint32_t milliseconds)
folly::ThreadLocal< std::shared_ptr< IOThread > > thisThread_
IOThreadPoolExecutor(size_t numThreads, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("IOThreadPool"), folly::EventBaseManager *ebm=folly::EventBaseManager::get(), bool waitForAll=false)
void runBeforeLoop(LoopCallback *callback)
Definition: EventBase.cpp:548
void stopThreads(size_t n) override
static void flushLocalMallocCaches()
Definition: MemoryIdler.cpp:41
std::shared_ptr< IOThread > pickThread()