proxygen
ThreadPoolExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 
22 namespace folly {
23 
26 
29  return *storage;
30 }
31 
33  threadtimeout_ms,
34  60000,
35  "Idle time before ThreadPoolExecutor threads are joined");
36 
38  size_t /* maxThreads */,
39  size_t minThreads,
40  std::shared_ptr<ThreadFactory> threadFactory,
41  bool isWaitForAll)
42  : threadFactory_(std::move(threadFactory)),
43  isWaitForAll_(isWaitForAll),
44  taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
45  threadPoolHook_("folly::ThreadPoolExecutor"),
46  minThreads_(minThreads),
47  threadTimeout_(FLAGS_threadtimeout_ms) {
48  getSyncVecThreadPoolExecutors()->push_back(this);
49 }
50 
53  CHECK_EQ(0, threadList_.get().size());
54  getSyncVecThreadPoolExecutors().withWLock([this](auto& tpe) {
55  tpe.erase(std::remove(tpe.begin(), tpe.end(), this), tpe.end());
56  });
57 }
58 
60  Func&& func,
61  std::chrono::milliseconds expiration,
62  Func&& expireCallback)
63  : func_(std::move(func)),
64  expiration_(expiration),
65  expireCallback_(std::move(expireCallback)),
66  context_(folly::RequestContext::saveContext()) {
67  // Assume that the task in enqueued on creation
69 }
70 
71 void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
72  thread->idle = false;
73  auto startTime = std::chrono::steady_clock::now();
74  task.stats_.waitTime = startTime - task.enqueueTime_;
75  if (task.expiration_ > std::chrono::milliseconds(0) &&
76  task.stats_.waitTime >= task.expiration_) {
77  task.stats_.expired = true;
78  if (task.expireCallback_ != nullptr) {
79  task.expireCallback_();
80  }
81  } else {
82  folly::RequestContextScopeGuard rctx(task.context_);
83  try {
84  task.func_();
85  } catch (const std::exception& e) {
86  LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled "
87  << typeid(e).name() << " exception: " << e.what();
88  } catch (...) {
89  LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
90  "object";
91  }
92  task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
93  }
94  thread->idle = true;
95  thread->lastActiveTime = std::chrono::steady_clock::now();
96  thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
97  *thread->taskStatsCallbacks->inCallback = true;
98  SCOPE_EXIT {
99  *thread->taskStatsCallbacks->inCallback = false;
100  };
101  try {
102  for (auto& callback : callbacks) {
103  callback(task.stats_);
104  }
105  } catch (const std::exception& e) {
106  LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
107  "unhandled "
108  << typeid(e).name() << " exception: " << e.what();
109  } catch (...) {
110  LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
111  "unhandled non-exception object";
112  }
113  });
114 }
115 
117  return maxThreads_.load(std::memory_order_relaxed);
118 }
119 
121  return activeThreads_.load(std::memory_order_relaxed);
122 }
123 
124 // Set the maximum number of running threads.
126  /* Since ThreadPoolExecutor may be dynamically adjusting the number of
127  threads, we adjust the relevant variables instead of changing
128  the number of threads directly. Roughly:
129 
130  If numThreads < minthreads reset minThreads to numThreads.
131 
132  If numThreads < active threads, reduce number of running threads.
133 
134  If the number of pending tasks is > 0, then increase the currently
135  active number of threads such that we can run all the tasks, or reach
136  numThreads.
137 
138  Note that if there are observers, we actually have to create all
139  the threads, because some observer implementations need to 'observe'
140  all thread creation (see tests for an example of this)
141  */
142 
143  size_t numThreadsToJoin = 0;
144  {
146  auto pending = getPendingTaskCountImpl();
147  maxThreads_.store(numThreads, std::memory_order_relaxed);
148  auto active = activeThreads_.load(std::memory_order_relaxed);
149  auto minthreads = minThreads_.load(std::memory_order_relaxed);
150  if (numThreads < minthreads) {
151  minthreads = numThreads;
152  minThreads_.store(numThreads, std::memory_order_relaxed);
153  }
154  if (active > numThreads) {
155  numThreadsToJoin = active - numThreads;
156  if (numThreadsToJoin > active - minthreads) {
157  numThreadsToJoin = active - minthreads;
158  }
159  ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
160  activeThreads_.store(
161  active - numThreadsToJoin, std::memory_order_relaxed);
162  } else if (pending > 0 || observers_.size() > 0 || active < minthreads) {
163  size_t numToAdd = std::min(pending, numThreads - active);
164  if (observers_.size() > 0) {
165  numToAdd = numThreads - active;
166  }
167  if (active + numToAdd < minthreads) {
168  numToAdd = minthreads - active;
169  }
171  activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
172  }
173  }
174 
175  /* We may have removed some threads, attempt to join them */
176  joinStoppedThreads(numThreadsToJoin);
177 }
178 
179 // threadListLock_ is writelocked
181  std::vector<ThreadPtr> newThreads;
182  for (size_t i = 0; i < n; i++) {
183  newThreads.push_back(makeThread());
184  }
185  for (auto& thread : newThreads) {
186  // TODO need a notion of failing to create the thread
187  // and then handling for that case
188  thread->handle = threadFactory_->newThread(
189  std::bind(&ThreadPoolExecutor::threadRun, this, thread));
190  threadList_.add(thread);
191  }
192  for (auto& thread : newThreads) {
193  thread->startupBaton.wait();
194  }
195  for (auto& o : observers_) {
196  for (auto& thread : newThreads) {
197  o->threadStarted(thread.get());
198  }
199  }
200 }
201 
202 // threadListLock_ is writelocked
203 void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
204  isJoin_ = isJoin;
205  stopThreads(n);
206 }
207 
209  for (size_t i = 0; i < n; i++) {
210  auto thread = stoppedThreads_.take();
211  thread->handle.join();
212  }
213 }
214 
217  size_t n = 0;
218  {
220  maxThreads_.store(0, std::memory_order_release);
221  activeThreads_.store(0, std::memory_order_release);
222  n = threadList_.get().size();
223  removeThreads(n, false);
224  n += threadsToJoin_.load(std::memory_order_relaxed);
225  threadsToJoin_.store(0, std::memory_order_relaxed);
226  }
228  CHECK_EQ(0, threadList_.get().size());
229  CHECK_EQ(0, stoppedThreads_.size());
230 }
231 
234  size_t n = 0;
235  {
237  maxThreads_.store(0, std::memory_order_release);
238  activeThreads_.store(0, std::memory_order_release);
239  n = threadList_.get().size();
240  removeThreads(n, true);
241  n += threadsToJoin_.load(std::memory_order_relaxed);
242  threadsToJoin_.store(0, std::memory_order_relaxed);
243  }
245  CHECK_EQ(0, threadList_.get().size());
246  CHECK_EQ(0, stoppedThreads_.size());
247 }
248 
250  getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
251  for (auto tpe : tpes) {
252  f(*tpe);
253  }
254  });
255 }
256 
258  const auto now = std::chrono::steady_clock::now();
261  size_t activeTasks = 0;
262  size_t idleAlive = 0;
263  for (auto thread : threadList_.get()) {
264  if (thread->idle) {
265  const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
266  stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
267  idleAlive++;
268  } else {
269  activeTasks++;
270  }
271  }
273  stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
274 
275  stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
276  stats.activeThreadCount =
277  activeThreads_.load(std::memory_order_relaxed) - idleAlive;
278  stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
279  return stats;
280 }
281 
284  return getPendingTaskCountImpl();
285 }
286 
288  auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
289  if (ntf == nullptr) {
290  return folly::demangle(typeid(*this).name()).toStdString();
291  }
292 
293  return ntf->getNamePrefix();
294 }
295 
296 std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
297 
299  if (*taskStatsCallbacks_->inCallback) {
300  throw std::runtime_error("cannot subscribe in task stats callback");
301  }
302  taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
303 }
304 
307  std::lock_guard<std::mutex> guard(mutex_);
308  queue_.push(std::move(item));
309  return sem_.post();
310 }
311 
313  while (true) {
314  {
315  std::lock_guard<std::mutex> guard(mutex_);
316  if (queue_.size() > 0) {
317  auto item = std::move(queue_.front());
318  queue_.pop();
319  return item;
320  }
321  }
322  sem_.wait();
323  }
324 }
325 
328  std::chrono::milliseconds time) {
329  while (true) {
330  {
331  std::lock_guard<std::mutex> guard(mutex_);
332  if (queue_.size() > 0) {
333  auto item = std::move(queue_.front());
334  queue_.pop();
335  return item;
336  }
337  }
338  if (!sem_.try_wait_for(time)) {
339  return folly::none;
340  }
341  }
342 }
343 
345  std::lock_guard<std::mutex> guard(mutex_);
346  return queue_.size();
347 }
348 
349 void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
350  {
352  observers_.push_back(o);
353  for (auto& thread : threadList_.get()) {
354  o->threadPreviouslyStarted(thread.get());
355  }
356  }
357  while (activeThreads_.load(std::memory_order_relaxed) <
358  maxThreads_.load(std::memory_order_relaxed)) {
360  }
361 }
362 
363 void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
365  for (auto& thread : threadList_.get()) {
366  o->threadNotYetStopped(thread.get());
367  }
368 
369  for (auto it = observers_.begin(); it != observers_.end(); it++) {
370  if (*it == o) {
371  observers_.erase(it);
372  return;
373  }
374  }
375  DCHECK(false);
376 }
377 
378 // Idle threads may have destroyed themselves, attempt to join
379 // them here
381  auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
382  if (tojoin) {
383  {
385  tojoin = threadsToJoin_.load(std::memory_order_relaxed);
386  threadsToJoin_.store(0, std::memory_order_relaxed);
387  }
388  joinStoppedThreads(tojoin);
389  }
390 }
391 
392 // threadListLock_ must be write locked.
394  // Try to stop based on idle thread timeout (try_take_for),
395  // if there are at least minThreads running.
396  if (!minActive()) {
397  return false;
398  }
399 
400  // Remove thread from active count
401  activeThreads_.store(
402  activeThreads_.load(std::memory_order_relaxed) - 1,
403  std::memory_order_relaxed);
404 
405  // There is a memory ordering constraint w.r.t the queue
406  // implementation's add() and getPendingTaskCountImpl() - while many
407  // queues have seq_cst ordering, some do not, so add an explicit
408  // barrier. tryTimeoutThread is the slow path and only happens once
409  // every thread timeout; use asymmetric barrier to keep add() fast.
411 
412  // If this is based on idle thread timeout, then
413  // adjust vars appropriately (otherwise stop() or join()
414  // does this).
415  if (getPendingTaskCountImpl() > 0) {
416  // There are still pending tasks, we can't stop yet.
417  // re-up active threads and return.
418  activeThreads_.store(
419  activeThreads_.load(std::memory_order_relaxed) + 1,
420  std::memory_order_relaxed);
421  return false;
422  }
423 
424  threadsToJoin_.store(
425  threadsToJoin_.load(std::memory_order_relaxed) + 1,
426  std::memory_order_relaxed);
427 
428  return true;
429 }
430 
431 // If we can't ensure that we were able to hand off a task to a thread,
432 // attempt to start a thread that handled the task, if we aren't already
433 // running the maximum number of threads.
435  ensureJoined();
436 
437  // Matches barrier in tryTimeoutThread(). Ensure task added
438  // is seen before loading activeThreads_ below.
440 
441  // Fast path assuming we are already at max threads.
442  auto active = activeThreads_.load(std::memory_order_relaxed);
443  auto total = maxThreads_.load(std::memory_order_relaxed);
444 
445  if (active >= total) {
446  return;
447  }
448 
450  // Double check behind lock.
451  active = activeThreads_.load(std::memory_order_relaxed);
452  total = maxThreads_.load(std::memory_order_relaxed);
453  if (active >= total) {
454  return;
455  }
457  activeThreads_.store(active + 1, std::memory_order_relaxed);
458 }
459 
460 // If an idle thread times out, only join it if there are at least
461 // minThreads threads.
463  return activeThreads_.load(std::memory_order_relaxed) >
464  minThreads_.load(std::memory_order_relaxed);
465 }
466 
467 } // namespace folly
static void runTask(const ThreadPtr &thread, Task &&task)
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
std::chrono::steady_clock::time_point enqueueTime_
auto f
std::atomic< size_t > threadsToJoin_
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
LogLevel max
Definition: LogLevel.cpp:31
virtual void threadRun(ThreadPtr thread)=0
void asymmetricHeavyBarrier(AMBFlags flags)
void addObserver(std::shared_ptr< Observer >)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
A reference wrapper for callable objects.
Definition: Function.h:893
void add(const ThreadPtr &state)
folly::Optional< ThreadPtr > try_take_for(std::chrono::milliseconds) override
virtual void stopThreads(size_t n)=0
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
std::shared_ptr< ThreadFactory > threadFactory_
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
std::atomic< size_t > maxThreads_
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
static void expiration()
std::string toStdString(const folly::fbstring &s)
Definition: String.h:41
const std::vector< ThreadPtr > & get() const
std::mutex mutex_
std::vector< std::shared_ptr< Observer > > observers_
const char * name
Definition: http_parser.c:437
std::function< void(TaskStats)> TaskStatsCallback
Task(Func &&func, std::chrono::milliseconds expiration, Func &&expireCallback)
LogLevel min
Definition: LogLevel.cpp:30
void removeObserver(std::shared_ptr< Observer >)
virtual ThreadPtr makeThread()
static void withAll(FunctionRef< void(ThreadPoolExecutor &)> f)
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
void setNumThreads(size_t numThreads)
FOLLY_ALWAYS_INLINE void asymmetricLightBarrier()
static std::atomic< uint64_t > nextId
BlockingQueueAddResult add(ThreadPtr item) override
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks_
const char * string
Definition: Conv.cpp:212
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
folly::Synchronized< std::vector< ThreadPoolExecutor * >> SyncVecThreadPoolExecutors
StoppedThreadQueue stoppedThreads_
void removeThreads(size_t n, bool isJoin)
void subscribeToTaskStats(TaskStatsCallback cb)
std::chrono::nanoseconds time()
fbstring demangle(const char *name)
Definition: Demangle.cpp:111
constexpr None none
Definition: Optional.h:87
SyncVecThreadPoolExecutors & getSyncVecThreadPoolExecutors()
std::atomic< size_t > minThreads_