proxygen
CPUThreadPoolExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 
23  dynamic_cputhreadpoolexecutor,
24  true,
25  "CPUThreadPoolExecutor will dynamically create and destroy threads");
26 
27 namespace folly {
28 
29 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
30 
32  size_t numThreads,
33  std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
34  std::shared_ptr<ThreadFactory> threadFactory)
36  numThreads,
37  FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
38  std::move(threadFactory)),
39  taskQueue_(std::move(taskQueue)) {
40  setNumThreads(numThreads);
41 }
42 
44  std::pair<size_t, size_t> numThreads,
45  std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
46  std::shared_ptr<ThreadFactory> threadFactory)
48  numThreads.first,
49  numThreads.second,
50  std::move(threadFactory)),
51  taskQueue_(std::move(taskQueue)) {
52  setNumThreads(numThreads.first);
53 }
54 
56  size_t numThreads,
57  std::shared_ptr<ThreadFactory> threadFactory)
59  numThreads,
62  std::move(threadFactory)) {}
63 
65  std::pair<size_t, size_t> numThreads,
66  std::shared_ptr<ThreadFactory> threadFactory)
68  numThreads,
71  std::move(threadFactory)) {}
72 
75  numThreads,
76  std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
77 
79  size_t numThreads,
80  int8_t numPriorities,
81  std::shared_ptr<ThreadFactory> threadFactory)
83  numThreads,
85  numPriorities,
87  std::move(threadFactory)) {}
88 
90  size_t numThreads,
91  int8_t numPriorities,
92  size_t maxQueueSize,
93  std::shared_ptr<ThreadFactory> threadFactory)
95  numThreads,
97  numPriorities,
98  maxQueueSize),
99  std::move(threadFactory)) {}
100 
102  stop();
103  CHECK(threadsToStop_ == 0);
104 }
105 
107  add(std::move(func), std::chrono::milliseconds(0));
108 }
109 
111  Func func,
112  std::chrono::milliseconds expiration,
113  Func expireCallback) {
114  auto result = taskQueue_->add(
115  CPUTask(std::move(func), expiration, std::move(expireCallback)));
116  if (!result.reusedThread) {
118  }
119 }
120 
122  add(std::move(func), priority, std::chrono::milliseconds(0));
123 }
124 
126  Func func,
127  int8_t priority,
128  std::chrono::milliseconds expiration,
129  Func expireCallback) {
130  CHECK(getNumPriorities() > 0);
131  auto result = taskQueue_->addWithPriority(
132  CPUTask(std::move(func), expiration, std::move(expireCallback)),
133  priority);
134  if (!result.reusedThread) {
136  }
137 }
138 
140  return taskQueue_->getNumPriorities();
141 }
142 
144  return taskQueue_->size();
145 }
146 
149  return taskQueue_.get();
150 }
151 
152 // threadListLock_ must be writelocked.
154  auto toStop = threadsToStop_.load(std::memory_order_relaxed);
155  if (toStop <= 0) {
156  return false;
157  }
158  threadsToStop_.store(toStop - 1, std::memory_order_relaxed);
159  return true;
160 }
161 
163  if (tryDecrToStop()) {
164  return true;
165  }
166  if (task) {
167  return false;
168  } else {
169  return tryTimeoutThread();
170  }
171  return true;
172 }
173 
176 
177  thread->startupBaton.post();
178  while (true) {
179  auto task = taskQueue_->try_take_for(threadTimeout_);
180  // Handle thread stopping, either by task timeout, or
181  // by 'poison' task added in join() or stop().
182  if (UNLIKELY(!task || task.value().poison)) {
183  // Actually remove the thread from the list.
185  if (taskShouldStop(task)) {
186  for (auto& o : observers_) {
187  o->threadStopped(thread.get());
188  }
189  threadList_.remove(thread);
190  stoppedThreads_.add(thread);
191  return;
192  } else {
193  continue;
194  }
195  }
196 
197  runTask(thread, std::move(task.value()));
198 
199  if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
201  if (tryDecrToStop()) {
202  threadList_.remove(thread);
203  stoppedThreads_.add(thread);
204  return;
205  }
206  }
207  }
208 }
209 
211  threadsToStop_ += n;
212  for (size_t i = 0; i < n; i++) {
213  taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
214  }
215 }
216 
217 // threadListLock_ is read (or write) locked.
219  return taskQueue_->size();
220 }
221 
222 } // namespace folly
static void runTask(const ThreadPtr &thread, Task &&task)
void addWithPriority(Func func, int8_t priority) override
std::unique_ptr< BlockingQueue< CPUTask > > taskQueue_
std::chrono::milliseconds threadTimeout_
static const int8_t LO_PRI
Definition: Executor.h:48
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::ThreadPoolListHook threadPoolHook_
STL namespace.
void stopThreads(size_t n) override
uint8_t getNumPriorities() const override
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
bool taskShouldStop(folly::Optional< CPUTask > &)
static void expiration()
BlockingQueue< CPUTask > * getTaskQueue()
std::vector< std::shared_ptr< Observer > > observers_
void remove(const ThreadPtr &state)
void threadRun(ThreadPtr thread) override
DEFINE_bool(dynamic_cputhreadpoolexecutor, true,"CPUThreadPoolExecutor will dynamically create and destroy threads")
void setNumThreads(size_t numThreads)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
BlockingQueueAddResult add(ThreadPtr item) override
StoppedThreadQueue stoppedThreads_
CPUThreadPoolExecutor(size_t numThreads, std::unique_ptr< BlockingQueue< CPUTask >> taskQueue, std::shared_ptr< ThreadFactory > threadFactory=std::make_shared< NamedThreadFactory >("CPUThreadPool"))
static const size_t kDefaultMaxQueueSize
#define UNLIKELY(x)
Definition: Likely.h:48
std::atomic< ssize_t > threadsToStop_
constexpr detail::First first
Definition: Base-inl.h:2553