31 #include <glog/logging.h> 60 std::shared_ptr<ThreadFactory> threadFactory,
61 bool isWaitForAll =
false);
65 void add(
Func func)
override = 0;
117 TaskStats() : expired(false), waitTime(0), runTime(0) {}
169 struct alignas(hardware_destructive_interference_size)
Thread 178 ~
Thread()
override =
default;
194 std::chrono::milliseconds expiration,
195 Func&& expireCallback);
204 static void runTask(
const ThreadPtr& thread,
Task&& task);
208 virtual void threadRun(ThreadPtr thread) = 0;
223 return std::make_shared<Thread>(
this);
232 auto it = std::lower_bound(
241 [&](
const ThreadPtr& ts1,
const ThreadPtr& ts2) ->
bool {
242 return compare(ts1, ts2);
244 vec_.insert(it, state);
247 void remove(
const ThreadPtr&
state) {
248 auto itPair = std::equal_range(
253 [&](
const ThreadPtr& ts1,
const ThreadPtr& ts2) ->
bool {
254 return compare(ts1, ts2);
256 CHECK(itPair.first != vec_.end());
257 CHECK(
std::next(itPair.first) == itPair.second);
258 vec_.erase(itPair.first);
261 const std::vector<ThreadPtr>&
get()
const {
266 static bool compare(
const ThreadPtr& ts1,
const ThreadPtr& ts2) {
267 return ts1->id < ts2->id;
276 ThreadPtr
take()
override;
277 size_t size()
override;
279 std::chrono::milliseconds )
override;
static void runTask(const ThreadPtr &thread, Task &&task)
std::atomic< size_t > activeThreads_
virtual size_t getPendingTaskCountImpl()=0
std::chrono::steady_clock::time_point enqueueTime_
SharedMutex threadListLock_
std::atomic< size_t > threadsToJoin_
folly::ThreadLocal< bool > inCallback
ThreadPoolExecutor(size_t maxThreads, size_t minThreads, std::shared_ptr< ThreadFactory > threadFactory, bool isWaitForAll=false)
std::chrono::milliseconds threadTimeout_
static bool compare(const ThreadPtr &ts1, const ThreadPtr &ts2)
virtual void threadRun(ThreadPtr thread)=0
void addObserver(std::shared_ptr< Observer >)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Thread(ThreadPoolExecutor *pool)
folly::ThreadPoolListHook threadPoolHook_
A reference wrapper for callable objects.
void add(const ThreadPtr &state)
virtual void stopThreads(size_t n)=0
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks
~ThreadPoolExecutor() override
std::shared_ptr< ThreadFactory > threadFactory_
—— Concurrent Priority Queue Implementation ——
std::shared_ptr< folly::RequestContext > context_
std::shared_ptr< Thread > ThreadPtr
std::atomic< bool > isJoin_
std::atomic< size_t > maxThreads_
std::vector< std::shared_ptr< Observer > > observers_
std::vector< ThreadPtr > vec_
constexpr auto size(C const &c) -> decltype(c.size())
void add(Func func) override=0
std::function< void(TaskStats)> TaskStatsCallback
uint64_t pendingTaskCount
std::chrono::steady_clock::time_point lastActiveTime
folly::Baton startupBaton
void removeObserver(std::shared_ptr< Observer >)
virtual ThreadPtr makeThread()
static void withAll(FunctionRef< void(ThreadPoolExecutor &)> f)
std::chrono::nanoseconds runTime
void addThreads(size_t n)
void setThreadDeathTimeout(std::chrono::milliseconds timeout)
std::shared_ptr< ThreadFactory > getThreadFactory()
void setNumThreads(size_t numThreads)
void ensureActiveThreads()
static std::atomic< uint64_t > nextId
virtual void threadPreviouslyStarted(ThreadHandle *h)
size_t getPendingTaskCount()
std::chrono::nanoseconds waitTime
T exchange(T &obj, U &&new_value)
std::chrono::milliseconds expiration_
std::shared_ptr< TaskStatsCallbackRegistry > taskStatsCallbacks_
void setThreadFactory(std::shared_ptr< ThreadFactory > threadFactory)
size_t numActiveThreads()
StoppedThreadQueue stoppedThreads_
detail::Take take(Number count)
void removeThreads(size_t n, bool isJoin)
std::queue< ThreadPtr > queue_
void joinStoppedThreads(size_t n)
void subscribeToTaskStats(TaskStatsCallback cb)
std::chrono::nanoseconds maxIdleTime
folly::Synchronized< std::vector< TaskStatsCallback > > callbackList
std::atomic< size_t > minThreads_
virtual void threadNotYetStopped(ThreadHandle *h)