21 #include <boost/thread.hpp> 36 using namespace folly;
40 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
49 TEST(ThreadPoolExecutorTest, CPUBasic) {
50 basic<CPUThreadPoolExecutor>();
53 TEST(IOThreadPoolExecutorTest, IOBasic) {
54 basic<IOThreadPoolExecutor>();
61 tpe.setNumThreads(50);
63 tpe.setNumThreads(150);
67 TEST(ThreadPoolExecutorTest, CPUResize) {
68 resize<CPUThreadPoolExecutor>();
71 TEST(ThreadPoolExecutorTest, IOResize) {
72 resize<IOThreadPoolExecutor>();
78 std::atomic<int> completed(0);
83 for (
int i = 0;
i < 1000;
i++) {
96 std::atomic<int> completed(0);
101 for (
int i = 0;
i < 10;
i++) {
108 TEST(ThreadPoolExecutorTest, CPUStop) {
109 stop<CPUThreadPoolExecutor>();
112 TEST(ThreadPoolExecutorTest, IOStop) {
119 std::atomic<int> completed(0);
124 for (
int i = 0;
i < 1000;
i++) {
131 TEST(ThreadPoolExecutorTest, CPUJoin) {
132 join<CPUThreadPoolExecutor>();
135 TEST(ThreadPoolExecutorTest, IOJoin) {
136 join<IOThreadPoolExecutor>();
142 std::atomic<int> completed(0);
147 for (
int i = 0;
i < 1000;
i++) {
160 std::atomic<int> completed(0);
165 for (
int i = 0;
i < 10;
i++) {
172 TEST(ThreadPoolExecutorTest, CPUDestroy) {
173 destroy<CPUThreadPoolExecutor>();
176 TEST(ThreadPoolExecutorTest, IODestroy) {
183 std::atomic<int> completed(0);
188 for (
int i = 0;
i < 1000;
i++) {
191 tpe.setNumThreads(5);
192 tpe.setNumThreads(15);
197 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
198 resizeUnderLoad<CPUThreadPoolExecutor>();
201 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
202 resizeUnderLoad<IOThreadPoolExecutor>();
209 auto stats = tpe.getPoolStats();
222 stats = tpe.getPoolStats();
232 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
233 poolStats<CPUThreadPoolExecutor>();
236 TEST(ThreadPoolExecutorTest, IOPoolStats) {
237 poolStats<IOThreadPoolExecutor>();
243 std::atomic<int>
c(0);
257 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
258 taskStats<CPUThreadPoolExecutor>();
261 TEST(ThreadPoolExecutorTest, IOTaskStats) {
262 taskStats<IOThreadPoolExecutor>();
268 std::atomic<int> statCbCount(0);
270 int i = statCbCount++;
279 std::atomic<int> expireCbCount(0);
280 auto expireCb = [&]() { expireCbCount++; };
281 tpe.add(
burnMs(10), seconds(60), expireCb);
282 tpe.add(
burnMs(10), milliseconds(10), expireCb);
288 TEST(ThreadPoolExecutorTest, CPUExpiration) {
289 expiration<CPUThreadPoolExecutor>();
292 TEST(ThreadPoolExecutorTest, IOExpiration) {
293 expiration<IOThreadPoolExecutor>();
296 template <
typename TPE>
299 std::atomic<int>
c{0};
316 fe.
addFuture([]() {
throw std::runtime_error(
"oops"); })
324 auto p = std::make_shared<Promise<int>>();
325 std::thread
t([p]() {
330 return p->getFuture();
342 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
343 futureExecutor<CPUThreadPoolExecutor>();
346 TEST(ThreadPoolExecutorTest, IOFuturePool) {
347 futureExecutor<IOThreadPoolExecutor>();
350 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
351 bool tookLopri =
false;
364 for (
int i = 0;
i < 50;
i++) {
367 for (
int i = 0;
i < 50;
i++) {
394 std::atomic<int> threads_{0};
397 TEST(ThreadPoolExecutorTest, IOObserver) {
398 auto observer = std::make_shared<TestObserver>();
410 observer->checkCalls();
413 TEST(ThreadPoolExecutorTest, CPUObserver) {
414 auto observer = std::make_shared<TestObserver>();
426 observer->checkCalls();
429 TEST(ThreadPoolExecutorTest, AddWithPriority) {
430 std::atomic_int
c{0};
431 auto f = [&] {
c++; };
438 cpuExe.addWithPriority(
f, -1);
439 cpuExe.addWithPriority(
f, 0);
440 cpuExe.addWithPriority(
f, 1);
441 cpuExe.addWithPriority(
f, -2);
442 cpuExe.addWithPriority(
f, 2);
451 std::atomic_int
c{0};
456 const int kQueueCapacity = 1;
457 const int kThreads = 1;
466 std::make_shared<NamedThreadFactory>(
"CPUThreadPool"));
471 for (
int i = 0;
i < 5;
i++) {
479 TEST(PriorityThreadFactoryTest, ThreadPriority) {
481 auto currentPriority = getpriority(PRIO_PROCESS, 0);
489 int desiredPriority =
std::min(20, currentPriority + 1);
492 std::make_shared<NamedThreadFactory>(
"stuff"), desiredPriority);
493 int actualPriority = -21;
494 factory.
newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
496 EXPECT_EQ(desiredPriority, actualPriority);
499 TEST(InitThreadFactoryTest, InitializerCalled) {
500 int initializerCalledCount = 0;
502 std::make_shared<NamedThreadFactory>(
"test"),
503 [&initializerCalledCount] { initializerCalledCount++; });
506 [&initializerCalledCount]() {
EXPECT_EQ(initializerCalledCount, 1); })
511 TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
512 bool initializerCalled =
false;
513 bool taskBodyCalled =
false;
514 bool finalizerCalled =
false;
517 std::make_shared<NamedThreadFactory>(
"test"),
523 initializerCalled =
true;
530 finalizerCalled =
true;
538 taskBodyCalled =
true;
583 std::this_thread::sleep_for(milliseconds(50));
591 template <
typename Q>
596 std::atomic<int> turn{};
598 std::thread consumer1([&] {
602 std::thread consumer2([&] {
607 std::thread producer1([&] {
615 std::thread producer2([&] {
629 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
630 bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
633 template <
typename T>
638 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
639 bugD3527722_test<UBQ<SlowMover>>();
642 template <
typename TPE>
647 std::thread::id id1, id2;
651 .thenValue([&id1](
auto&&) {
653 id1 = std::this_thread::get_id();
655 .thenValue([&id2](
auto&&) {
657 id2 = std::this_thread::get_id();
668 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
669 removeThreadTest<IOThreadPoolExecutor>();
672 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
673 removeThreadTest<CPUThreadPoolExecutor>();
676 template <
typename TPE>
681 std::atomic<int> completed(0);
686 for (
int i = 0;
i < 1000;
i++) {
689 tpe.setNumThreads(8);
691 tpe.setNumThreads(5);
693 tpe.setNumThreads(15);
699 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
700 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
703 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
704 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
707 template <
typename TPE>
709 auto executor = std::make_unique<TPE>(4);
714 auto&&) {
return 42; })
723 TEST(ThreadPoolExecutorTest, KeepAliveTestIO) {
724 keepAliveTest<IOThreadPoolExecutor>();
727 TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) {
728 keepAliveTest<CPUThreadPoolExecutor>();
737 template <
typename TPE>
752 TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) {
753 registersToExecutorListTest<IOThreadPoolExecutor>();
756 TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) {
757 registersToExecutorListTest<CPUThreadPoolExecutor>();
760 template <
typename TPE>
762 auto ntf = std::make_shared<NamedThreadFactory>(
"my_executor");
767 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) {
768 testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>();
771 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) {
772 testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>();
775 TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
776 boost::barrier barrier{3};
777 auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
780 e.
add(twice_waiting_task);
781 e.
add(twice_waiting_task);
794 TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
797 std::atomic<uint64_t>
count{0};
798 for (
int i = 0;
i < 10000;
i++) {
801 count.fetch_add(1, std::memory_order_relaxed);
810 TEST(ThreadPoolExecutorTest, AddPerf) {
816 std::make_shared<NamedThreadFactory>(
"CPUThreadPool"));
817 e.setThreadDeathTimeout(std::chrono::milliseconds(1));
818 for (
int i = 0;
i < 10000;
i++) {
819 e.add([&]() { e.add([]() { usleep(1000); }); });
824 template <
typename TPE>
834 .thenValue([](
auto&&) {
burnMs(100)(); })
835 .thenValue([&](
auto&&) { ++
counter; })
837 .thenValue([](
auto&&) {
burnMs(100)(); })
838 .thenValue([&](
auto&&) { ++
counter; });
844 template <
typename TPE>
856 .thenValue([&](
auto&&) {
861 .thenValue([&](
auto&&) { ++
counter; })
866 bool functionDestroyed{
false};
867 bool functionCalled{
false};
871 std::this_thread::sleep_for(100ms);
872 functionDestroyed =
true;
875 functionCalled =
true;
886 TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
887 WeakRefTest<IOThreadPoolExecutor>();
890 TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
891 WeakRefTest<CPUThreadPoolExecutor>();
894 TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
895 virtualExecutorTest<IOThreadPoolExecutor>();
898 TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
899 virtualExecutorTest<CPUThreadPoolExecutor>();
bool hasCallback() override
static void testUsesNameFromNamedThreadFactory()
spin_result spin_yield_until(std::chrono::time_point< Clock, Duration > const &deadline, F f)
static void registersToExecutorListTest()
static void futureExecutor()
#define EXPECT_NO_THROW(statement)
#define EXPECT_THROW(statement, expected_exception)
#define ASSERT_EQ(val1, val2)
SlowMover(bool slow_=false)
void destroy< IOThreadPoolExecutor >()
void setContextData(const RequestToken &val, std::unique_ptr< RequestData > data)
#define EXPECT_EQ(val1, val2)
static const int8_t LO_PRI
Future< Unit > sleep(Duration dur, Timekeeper *tk)
void addObserver(std::shared_ptr< Observer >)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
SlowMover & operator=(SlowMover &&other) noexcept
std::enable_if< folly::isFuture< invoke_result_t< F > >::value, invoke_result_t< F > >::type addFuture(F func)
—— Concurrent Priority Queue Implementation ——
in_place_tag in_place(in_place_tag={})
requires E e noexcept(noexcept(s.error(std::move(e))))
#define EXPECT_GE(val1, val2)
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle *) override
static void resizeThreadWhileExecutingTest()
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
void add(Func f) override
SlowMover(SlowMover &&other) noexcept
static void resizeUnderLoad()
void add(Func func) override
void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle *) override
void removeObserver(std::shared_ptr< Observer >)
virtual void addWithPriority(Func, int8_t priority)
static void withAll(FunctionRef< void(ThreadPoolExecutor &)> f)
constexpr auto data(C &c) -> decltype(c.data())
GuardImpl guard(ErrorHandler &&handler)
std::chrono::nanoseconds runTime
static void removeThreadTest()
void setThreadDeathTimeout(std::chrono::milliseconds timeout)
void add(Func func) override
void setNumThreads(size_t numThreads)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
int getNumThreadPoolExecutors()
#define EXPECT_TRUE(condition)
std::thread newThread(Func &&func) override
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
std::chrono::nanoseconds waitTime
std::thread newThread(Func &&func) override
static Func burnMs(uint64_t ms)
std::atomic< int > counter
#define EXPECT_NE(val1, val2)
void threadStarted(ThreadPoolExecutor::ThreadHandle *) override
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
void join(const Delim &delimiter, Iterator begin, Iterator end, String &output)
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
#define EXPECT_FALSE(condition)
void addWithPriority(Func f, int8_t priority) override
static void virtualExecutorTest()
void throwSystemError(Args &&...args)
static const int8_t HI_PRI
#define EXPECT_LT(val1, val2)
static void WeakRefTest()
void threadStopped(ThreadPoolExecutor::ThreadHandle *) override
RequestData * getContextData(const RequestToken &val)
#define ASSERT_TRUE(condition)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static RequestContext * get()
Future< typename std::decay< T >::type > makeFuture(T &&t)
#define EXPECT_GT(val1, val2)
void stop< IOThreadPoolExecutor >()