35 "Idle time before ThreadPoolExecutor threads are joined");
40 std::shared_ptr<ThreadFactory> threadFactory,
42 : threadFactory_(
std::
move(threadFactory)),
43 isWaitForAll_(isWaitForAll),
45 threadPoolHook_(
"folly::ThreadPoolExecutor"),
46 minThreads_(minThreads),
47 threadTimeout_(FLAGS_threadtimeout_ms) {
55 tpe.erase(std::remove(tpe.begin(), tpe.end(),
this), tpe.end());
62 Func&& expireCallback)
64 expiration_(expiration),
65 expireCallback_(
std::
move(expireCallback)),
74 task.stats_.waitTime = startTime - task.enqueueTime_;
75 if (task.expiration_ > std::chrono::milliseconds(0) &&
76 task.stats_.waitTime >= task.expiration_) {
77 task.stats_.expired =
true;
78 if (task.expireCallback_ !=
nullptr) {
79 task.expireCallback_();
85 }
catch (
const std::exception& e) {
86 LOG(ERROR) <<
"ThreadPoolExecutor: func threw unhandled " 87 <<
typeid(e).
name() <<
" exception: " << e.what();
89 LOG(ERROR) <<
"ThreadPoolExecutor: func threw unhandled non-exception " 96 thread->taskStatsCallbacks->callbackList.withRLock([&](
auto& callbacks) {
97 *thread->taskStatsCallbacks->inCallback =
true;
99 *thread->taskStatsCallbacks->inCallback =
false;
102 for (
auto& callback : callbacks) {
103 callback(task.stats_);
105 }
catch (
const std::exception& e) {
106 LOG(ERROR) <<
"ThreadPoolExecutor: task stats callback threw " 108 <<
typeid(e).
name() <<
" exception: " << e.what();
110 LOG(ERROR) <<
"ThreadPoolExecutor: task stats callback threw " 111 "unhandled non-exception object";
117 return maxThreads_.load(std::memory_order_relaxed);
143 size_t numThreadsToJoin = 0;
147 maxThreads_.store(numThreads, std::memory_order_relaxed);
149 auto minthreads =
minThreads_.load(std::memory_order_relaxed);
150 if (numThreads < minthreads) {
152 minThreads_.store(numThreads, std::memory_order_relaxed);
154 if (active > numThreads) {
156 if (numThreadsToJoin > active - minthreads) {
157 numThreadsToJoin = active - minthreads;
161 active - numThreadsToJoin, std::memory_order_relaxed);
162 }
else if (pending > 0 ||
observers_.size() > 0 || active < minthreads) {
163 size_t numToAdd =
std::min(pending, numThreads - active);
165 numToAdd = numThreads - active;
167 if (active + numToAdd < minthreads) {
168 numToAdd = minthreads - active;
171 activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
181 std::vector<ThreadPtr> newThreads;
182 for (
size_t i = 0;
i < n;
i++) {
185 for (
auto& thread : newThreads) {
192 for (
auto& thread : newThreads) {
193 thread->startupBaton.wait();
196 for (
auto& thread : newThreads) {
197 o->threadStarted(thread.get());
209 for (
size_t i = 0;
i < n;
i++) {
211 thread->handle.join();
251 for (
auto tpe : tpes) {
261 size_t activeTasks = 0;
262 size_t idleAlive = 0;
265 const std::chrono::nanoseconds idleTime =
now - thread->lastActiveTime;
289 if (ntf ==
nullptr) {
293 return ntf->getNamePrefix();
300 throw std::runtime_error(
"cannot subscribe in task stats callback");
316 if (queue_.size() > 0) {
328 std::chrono::milliseconds
time) {
332 if (queue_.size() > 0) {
338 if (!sem_.try_wait_for(time)) {
346 return queue_.size();
354 o->threadPreviouslyStarted(thread.get());
366 o->threadNotYetStopped(thread.get());
403 std::memory_order_relaxed);
420 std::memory_order_relaxed);
426 std::memory_order_relaxed);
443 auto total =
maxThreads_.load(std::memory_order_relaxed);
445 if (active >= total) {
452 total =
maxThreads_.load(std::memory_order_relaxed);
453 if (active >= total) {
static void runTask(const ThreadPtr &thread, Task &&task)
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
std::chrono::steady_clock::time_point enqueueTime_
SharedMutex threadListLock_
std::atomic< size_t > threadsToJoin_
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
virtual void threadRun(ThreadPtr thread)=0
void asymmetricHeavyBarrier(AMBFlags flags)
void addObserver(std::shared_ptr< Observer >)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
A reference wrapper for callable objects.
void add(const ThreadPtr &state)
folly::Optional< ThreadPtr > try_take_for(std::chrono::milliseconds) override
virtual void stopThreads(size_t n)=0
ThreadPtr take() override
~ThreadPoolExecutor() override
std::shared_ptr< ThreadFactory > threadFactory_
—— Concurrent Priority Queue Implementation ——
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
std::atomic< size_t > maxThreads_
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
std::string toStdString(const folly::fbstring &s)
const std::vector< ThreadPtr > & get() const
std::vector< std::shared_ptr< Observer > > observers_
std::function< void(TaskStats)> TaskStatsCallback
Task(Func &&func, std::chrono::milliseconds expiration, Func &&expireCallback)
uint64_t pendingTaskCount
void removeObserver(std::shared_ptr< Observer >)
virtual ThreadPtr makeThread()
static void withAll(FunctionRef< void(ThreadPoolExecutor &)> f)
GuardImpl guard(ErrorHandler &&handler)
void addThreads(size_t n)
void setNumThreads(size_t numThreads)
FOLLY_ALWAYS_INLINE void asymmetricLightBarrier()
void ensureActiveThreads()
static std::atomic< uint64_t > nextId
size_t getPendingTaskCount()
BlockingQueueAddResult add(ThreadPtr item) override
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks_
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
size_t numActiveThreads()
folly::Synchronized< std::vector< ThreadPoolExecutor * >> SyncVecThreadPoolExecutors
StoppedThreadQueue stoppedThreads_
void removeThreads(size_t n, bool isJoin)
void joinStoppedThreads(size_t n)
void subscribeToTaskStats(TaskStatsCallback cb)
std::chrono::nanoseconds maxIdleTime
std::chrono::nanoseconds time()
fbstring demangle(const char *name)
SyncVecThreadPoolExecutors & getSyncVecThreadPoolExecutors()
std::atomic< size_t > minThreads_