proxygen
ThreadPoolExecutorTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <atomic>
18 #include <memory>
19 #include <thread>
20 
21 #include <boost/thread.hpp>
22 
23 #include <folly/Exception.h>
24 #include <folly/VirtualExecutor.h>
35 
36 using namespace folly;
37 using namespace std::chrono;
38 
39 static Func burnMs(uint64_t ms) {
40  return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
41 }
42 
43 template <class TPE>
44 static void basic() {
45  // Create and destroy
46  TPE tpe(10);
47 }
48 
49 TEST(ThreadPoolExecutorTest, CPUBasic) {
50  basic<CPUThreadPoolExecutor>();
51 }
52 
53 TEST(IOThreadPoolExecutorTest, IOBasic) {
54  basic<IOThreadPoolExecutor>();
55 }
56 
57 template <class TPE>
58 static void resize() {
59  TPE tpe(100);
60  EXPECT_EQ(100, tpe.numThreads());
61  tpe.setNumThreads(50);
62  EXPECT_EQ(50, tpe.numThreads());
63  tpe.setNumThreads(150);
64  EXPECT_EQ(150, tpe.numThreads());
65 }
66 
67 TEST(ThreadPoolExecutorTest, CPUResize) {
68  resize<CPUThreadPoolExecutor>();
69 }
70 
71 TEST(ThreadPoolExecutorTest, IOResize) {
72  resize<IOThreadPoolExecutor>();
73 }
74 
75 template <class TPE>
76 static void stop() {
77  TPE tpe(1);
78  std::atomic<int> completed(0);
79  auto f = [&]() {
80  burnMs(10)();
81  completed++;
82  };
83  for (int i = 0; i < 1000; i++) {
84  tpe.add(f);
85  }
86  tpe.stop();
87  EXPECT_GT(1000, completed);
88 }
89 
90 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
91 // to the event base, will be executed upon its destruction, and cannot be
92 // taken back.
93 template <>
95  IOThreadPoolExecutor tpe(1);
96  std::atomic<int> completed(0);
97  auto f = [&]() {
98  burnMs(10)();
99  completed++;
100  };
101  for (int i = 0; i < 10; i++) {
102  tpe.add(f);
103  }
104  tpe.stop();
105  EXPECT_EQ(10, completed);
106 }
107 
108 TEST(ThreadPoolExecutorTest, CPUStop) {
109  stop<CPUThreadPoolExecutor>();
110 }
111 
112 TEST(ThreadPoolExecutorTest, IOStop) {
114 }
115 
116 template <class TPE>
117 static void join() {
118  TPE tpe(10);
119  std::atomic<int> completed(0);
120  auto f = [&]() {
121  burnMs(1)();
122  completed++;
123  };
124  for (int i = 0; i < 1000; i++) {
125  tpe.add(f);
126  }
127  tpe.join();
128  EXPECT_EQ(1000, completed);
129 }
130 
131 TEST(ThreadPoolExecutorTest, CPUJoin) {
132  join<CPUThreadPoolExecutor>();
133 }
134 
135 TEST(ThreadPoolExecutorTest, IOJoin) {
136  join<IOThreadPoolExecutor>();
137 }
138 
139 template <class TPE>
140 static void destroy() {
141  TPE tpe(1);
142  std::atomic<int> completed(0);
143  auto f = [&]() {
144  burnMs(10)();
145  completed++;
146  };
147  for (int i = 0; i < 1000; i++) {
148  tpe.add(f);
149  }
150  tpe.stop();
151  EXPECT_GT(1000, completed);
152 }
153 
154 // IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong
155 // to the event base, will be executed upon its destruction, and cannot be
156 // taken back.
157 template <>
160  std::atomic<int> completed(0);
161  auto f = [&]() {
162  burnMs(10)();
163  completed++;
164  };
165  for (int i = 0; i < 10; i++) {
166  tpe->add(f);
167  }
168  tpe.clear();
169  EXPECT_EQ(10, completed);
170 }
171 
172 TEST(ThreadPoolExecutorTest, CPUDestroy) {
173  destroy<CPUThreadPoolExecutor>();
174 }
175 
176 TEST(ThreadPoolExecutorTest, IODestroy) {
178 }
179 
180 template <class TPE>
181 static void resizeUnderLoad() {
182  TPE tpe(10);
183  std::atomic<int> completed(0);
184  auto f = [&]() {
185  burnMs(1)();
186  completed++;
187  };
188  for (int i = 0; i < 1000; i++) {
189  tpe.add(f);
190  }
191  tpe.setNumThreads(5);
192  tpe.setNumThreads(15);
193  tpe.join();
194  EXPECT_EQ(1000, completed);
195 }
196 
197 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
198  resizeUnderLoad<CPUThreadPoolExecutor>();
199 }
200 
201 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
202  resizeUnderLoad<IOThreadPoolExecutor>();
203 }
204 
205 template <class TPE>
206 static void poolStats() {
207  folly::Baton<> startBaton, endBaton;
208  TPE tpe(1);
209  auto stats = tpe.getPoolStats();
210  EXPECT_GE(1, stats.threadCount);
211  EXPECT_GE(1, stats.idleThreadCount);
212  EXPECT_EQ(0, stats.activeThreadCount);
213  EXPECT_EQ(0, stats.pendingTaskCount);
214  EXPECT_EQ(0, tpe.getPendingTaskCount());
215  EXPECT_EQ(0, stats.totalTaskCount);
216  tpe.add([&]() {
217  startBaton.post();
218  endBaton.wait();
219  });
220  tpe.add([&]() {});
221  startBaton.wait();
222  stats = tpe.getPoolStats();
223  EXPECT_EQ(1, stats.threadCount);
224  EXPECT_EQ(0, stats.idleThreadCount);
225  EXPECT_EQ(1, stats.activeThreadCount);
226  EXPECT_EQ(1, stats.pendingTaskCount);
227  EXPECT_EQ(1, tpe.getPendingTaskCount());
228  EXPECT_EQ(2, stats.totalTaskCount);
229  endBaton.post();
230 }
231 
232 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
233  poolStats<CPUThreadPoolExecutor>();
234 }
235 
236 TEST(ThreadPoolExecutorTest, IOPoolStats) {
237  poolStats<IOThreadPoolExecutor>();
238 }
239 
240 template <class TPE>
241 static void taskStats() {
242  TPE tpe(1);
243  std::atomic<int> c(0);
244  tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
245  int i = c++;
246  EXPECT_LT(milliseconds(0), stats.runTime);
247  if (i == 1) {
248  EXPECT_LT(milliseconds(0), stats.waitTime);
249  }
250  });
251  tpe.add(burnMs(10));
252  tpe.add(burnMs(10));
253  tpe.join();
254  EXPECT_EQ(2, c);
255 }
256 
257 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
258  taskStats<CPUThreadPoolExecutor>();
259 }
260 
261 TEST(ThreadPoolExecutorTest, IOTaskStats) {
262  taskStats<IOThreadPoolExecutor>();
263 }
264 
265 template <class TPE>
266 static void expiration() {
267  TPE tpe(1);
268  std::atomic<int> statCbCount(0);
269  tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
270  int i = statCbCount++;
271  if (i == 0) {
272  EXPECT_FALSE(stats.expired);
273  } else if (i == 1) {
274  EXPECT_TRUE(stats.expired);
275  } else {
276  FAIL();
277  }
278  });
279  std::atomic<int> expireCbCount(0);
280  auto expireCb = [&]() { expireCbCount++; };
281  tpe.add(burnMs(10), seconds(60), expireCb);
282  tpe.add(burnMs(10), milliseconds(10), expireCb);
283  tpe.join();
284  EXPECT_EQ(2, statCbCount);
285  EXPECT_EQ(1, expireCbCount);
286 }
287 
288 TEST(ThreadPoolExecutorTest, CPUExpiration) {
289  expiration<CPUThreadPoolExecutor>();
290 }
291 
292 TEST(ThreadPoolExecutorTest, IOExpiration) {
293  expiration<IOThreadPoolExecutor>();
294 }
295 
296 template <typename TPE>
297 static void futureExecutor() {
298  FutureExecutor<TPE> fe(2);
299  std::atomic<int> c{0};
300  fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
301  c++;
302  EXPECT_EQ(42, t.value());
303  });
304  fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
305  c++;
306  EXPECT_EQ(100, t.value());
307  });
308  fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
309  c++;
310  EXPECT_NO_THROW(t.value());
311  });
312  fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
313  c++;
314  EXPECT_NO_THROW(t.value());
315  });
316  fe.addFuture([]() { throw std::runtime_error("oops"); })
317  .then([&](Try<Unit>&& t) {
318  c++;
319  EXPECT_THROW(t.value(), std::runtime_error);
320  });
321  // Test doing actual async work
322  folly::Baton<> baton;
323  fe.addFuture([&]() {
324  auto p = std::make_shared<Promise<int>>();
325  std::thread t([p]() {
326  burnMs(10)();
327  p->setValue(42);
328  });
329  t.detach();
330  return p->getFuture();
331  })
332  .then([&](Try<int>&& t) {
333  EXPECT_EQ(42, t.value());
334  c++;
335  baton.post();
336  });
337  baton.wait();
338  fe.join();
339  EXPECT_EQ(6, c);
340 }
341 
342 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
343  futureExecutor<CPUThreadPoolExecutor>();
344 }
345 
346 TEST(ThreadPoolExecutorTest, IOFuturePool) {
347  futureExecutor<IOThreadPoolExecutor>();
348 }
349 
350 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
351  bool tookLopri = false;
352  auto completed = 0;
353  auto hipri = [&] {
354  EXPECT_FALSE(tookLopri);
355  completed++;
356  };
357  auto lopri = [&] {
358  tookLopri = true;
359  completed++;
360  };
361  CPUThreadPoolExecutor pool(0, 2);
362  {
363  VirtualExecutor ve(pool);
364  for (int i = 0; i < 50; i++) {
366  }
367  for (int i = 0; i < 50; i++) {
369  }
370  pool.setNumThreads(1);
371  }
372  EXPECT_EQ(100, completed);
373 }
374 
376  public:
378  threads_++;
379  }
381  threads_--;
382  }
384  threads_++;
385  }
387  threads_--;
388  }
389  void checkCalls() {
390  ASSERT_EQ(threads_, 0);
391  }
392 
393  private:
394  std::atomic<int> threads_{0};
395 };
396 
397 TEST(ThreadPoolExecutorTest, IOObserver) {
398  auto observer = std::make_shared<TestObserver>();
399 
400  {
402  exe.addObserver(observer);
403  exe.setNumThreads(3);
404  exe.setNumThreads(0);
405  exe.setNumThreads(7);
406  exe.removeObserver(observer);
407  exe.setNumThreads(10);
408  }
409 
410  observer->checkCalls();
411 }
412 
413 TEST(ThreadPoolExecutorTest, CPUObserver) {
414  auto observer = std::make_shared<TestObserver>();
415 
416  {
418  exe.addObserver(observer);
419  exe.setNumThreads(3);
420  exe.setNumThreads(0);
421  exe.setNumThreads(7);
422  exe.removeObserver(observer);
423  exe.setNumThreads(10);
424  }
425 
426  observer->checkCalls();
427 }
428 
429 TEST(ThreadPoolExecutorTest, AddWithPriority) {
430  std::atomic_int c{0};
431  auto f = [&] { c++; };
432 
433  // IO exe doesn't support priorities
434  IOThreadPoolExecutor ioExe(10);
435  EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
436 
437  CPUThreadPoolExecutor cpuExe(10, 3);
438  cpuExe.addWithPriority(f, -1);
439  cpuExe.addWithPriority(f, 0);
440  cpuExe.addWithPriority(f, 1);
441  cpuExe.addWithPriority(f, -2); // will add at the lowest priority
442  cpuExe.addWithPriority(f, 2); // will add at the highest priority
443  cpuExe.addWithPriority(f, Executor::LO_PRI);
444  cpuExe.addWithPriority(f, Executor::HI_PRI);
445  cpuExe.join();
446 
447  EXPECT_EQ(7, c);
448 }
449 
450 TEST(ThreadPoolExecutorTest, BlockingQueue) {
451  std::atomic_int c{0};
452  auto f = [&] {
453  burnMs(1)();
454  c++;
455  };
456  const int kQueueCapacity = 1;
457  const int kThreads = 1;
458 
459  auto queue = std::make_unique<LifoSemMPMCQueue<
461  QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
462 
463  CPUThreadPoolExecutor cpuExe(
464  kThreads,
465  std::move(queue),
466  std::make_shared<NamedThreadFactory>("CPUThreadPool"));
467 
468  // Add `f` five times. It sleeps for 1ms every time. Calling
469  // `cppExec.add()` is *almost* guaranteed to block because there's
470  // only 1 cpu worker thread.
471  for (int i = 0; i < 5; i++) {
472  EXPECT_NO_THROW(cpuExe.add(f));
473  }
474  cpuExe.join();
475 
476  EXPECT_EQ(5, c);
477 }
478 
479 TEST(PriorityThreadFactoryTest, ThreadPriority) {
480  errno = 0;
481  auto currentPriority = getpriority(PRIO_PROCESS, 0);
482  if (errno != 0) {
483  throwSystemError("failed to get current priority");
484  }
485 
486  // Non-root users can only increase the priority value. Make sure we are
487  // trying to go to a higher priority than we are currently running as, up to
488  // the maximum allowed of 20.
489  int desiredPriority = std::min(20, currentPriority + 1);
490 
491  PriorityThreadFactory factory(
492  std::make_shared<NamedThreadFactory>("stuff"), desiredPriority);
493  int actualPriority = -21;
494  factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
495  .join();
496  EXPECT_EQ(desiredPriority, actualPriority);
497 }
498 
499 TEST(InitThreadFactoryTest, InitializerCalled) {
500  int initializerCalledCount = 0;
501  InitThreadFactory factory(
502  std::make_shared<NamedThreadFactory>("test"),
503  [&initializerCalledCount] { initializerCalledCount++; });
504  factory
505  .newThread(
506  [&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); })
507  .join();
508  EXPECT_EQ(initializerCalledCount, 1);
509 }
510 
511 TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
512  bool initializerCalled = false;
513  bool taskBodyCalled = false;
514  bool finalizerCalled = false;
515 
516  InitThreadFactory factory(
517  std::make_shared<NamedThreadFactory>("test"),
518  [&] {
519  // thread initializer
520  EXPECT_FALSE(initializerCalled);
521  EXPECT_FALSE(taskBodyCalled);
522  EXPECT_FALSE(finalizerCalled);
523  initializerCalled = true;
524  },
525  [&] {
526  // thread finalizer
527  EXPECT_TRUE(initializerCalled);
528  EXPECT_TRUE(taskBodyCalled);
529  EXPECT_FALSE(finalizerCalled);
530  finalizerCalled = true;
531  });
532 
533  factory
534  .newThread([&]() {
535  EXPECT_TRUE(initializerCalled);
536  EXPECT_FALSE(taskBodyCalled);
537  EXPECT_FALSE(finalizerCalled);
538  taskBodyCalled = true;
539  })
540  .join();
541 
542  EXPECT_TRUE(initializerCalled);
543  EXPECT_TRUE(taskBodyCalled);
544  EXPECT_TRUE(finalizerCalled);
545 }
546 
547 class TestData : public folly::RequestData {
548  public:
549  explicit TestData(int data) : data_(data) {}
550  ~TestData() override {}
551 
552  bool hasCallback() override {
553  return false;
554  }
555 
556  int data_;
557 };
558 
559 TEST(ThreadPoolExecutorTest, RequestContext) {
561 
562  RequestContextScopeGuard rctx; // create new request context for this scope
563  EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
564  RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
565  auto data = RequestContext::get()->getContextData("test");
566  EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
567 
568  executor.add([] {
569  auto data2 = RequestContext::get()->getContextData("test");
570  ASSERT_TRUE(data2 != nullptr);
571  EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_);
572  });
573 }
574 
575 struct SlowMover {
576  explicit SlowMover(bool slow_ = false) : slow(slow_) {}
578  *this = std::move(other);
579  }
581  slow = other.slow;
582  if (slow) {
583  /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
584  }
585  return *this;
586  }
587 
588  bool slow;
589 };
590 
591 template <typename Q>
593  // Test that the queue does not get stuck if writes are completed in
594  // order opposite to how they are initiated.
595  Q q(1024);
596  std::atomic<int> turn{};
597 
598  std::thread consumer1([&] {
599  ++turn;
600  q.take();
601  });
602  std::thread consumer2([&] {
603  ++turn;
604  q.take();
605  });
606 
607  std::thread producer1([&] {
608  ++turn;
609  while (turn < 4) {
610  ;
611  }
612  ++turn;
613  q.add(SlowMover(true));
614  });
615  std::thread producer2([&] {
616  ++turn;
617  while (turn < 5) {
618  ;
619  }
620  q.add(SlowMover(false));
621  });
622 
623  producer1.join();
624  producer2.join();
625  consumer1.join();
626  consumer2.join();
627 }
628 
629 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
630  bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
631 }
632 
633 template <typename T>
634 struct UBQ : public UnboundedBlockingQueue<T> {
635  explicit UBQ(int) {}
636 };
637 
638 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
639  bugD3527722_test<UBQ<SlowMover>>();
640 }
641 
642 template <typename TPE>
643 static void removeThreadTest() {
644  // test that adding a .then() after we have removed some threads
645  // doesn't cause deadlock and they are executed on different threads
647  std::thread::id id1, id2;
648  TPE fe(2);
649  f = folly::makeFuture()
650  .via(&fe)
651  .thenValue([&id1](auto&&) {
652  burnMs(100)();
653  id1 = std::this_thread::get_id();
654  })
655  .thenValue([&id2](auto&&) {
656  return 77;
657  id2 = std::this_thread::get_id();
658  });
659  fe.setNumThreads(1);
660 
661  // future::then should be fulfilled because there is other thread available
662  EXPECT_EQ(77, std::move(*f).get());
663  // two thread should be different because then part should be rescheduled to
664  // the other thread
665  EXPECT_NE(id1, id2);
666 }
667 
668 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
669  removeThreadTest<IOThreadPoolExecutor>();
670 }
671 
672 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
673  removeThreadTest<CPUThreadPoolExecutor>();
674 }
675 
676 template <typename TPE>
678  TPE tpe(10);
679  EXPECT_EQ(10, tpe.numThreads());
680 
681  std::atomic<int> completed(0);
682  auto f = [&]() {
683  burnMs(10)();
684  completed++;
685  };
686  for (int i = 0; i < 1000; i++) {
687  tpe.add(f);
688  }
689  tpe.setNumThreads(8);
690  EXPECT_EQ(8, tpe.numThreads());
691  tpe.setNumThreads(5);
692  EXPECT_EQ(5, tpe.numThreads());
693  tpe.setNumThreads(15);
694  EXPECT_EQ(15, tpe.numThreads());
695  tpe.join();
696  EXPECT_EQ(1000, completed);
697 }
698 
699 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
700  resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
701 }
702 
703 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
704  resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
705 }
706 
707 template <typename TPE>
709  auto executor = std::make_unique<TPE>(4);
710 
711  auto f = futures::sleep(std::chrono::milliseconds{100})
712  .via(executor.get())
713  .thenValue([keepAlive = getKeepAliveToken(executor.get())](
714  auto&&) { return 42; })
715  .semi();
716 
717  executor.reset();
718 
719  EXPECT_TRUE(f.isReady());
720  EXPECT_EQ(42, std::move(f).get());
721 }
722 
723 TEST(ThreadPoolExecutorTest, KeepAliveTestIO) {
724  keepAliveTest<IOThreadPoolExecutor>();
725 }
726 
727 TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) {
728  keepAliveTest<CPUThreadPoolExecutor>();
729 }
730 
732  int count = 0;
733  ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; });
734  return count;
735 }
736 
737 template <typename TPE>
740  {
741  TPE tpe(10);
743  {
744  TPE tpe2(5);
746  }
748  }
750 }
751 
752 TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) {
753  registersToExecutorListTest<IOThreadPoolExecutor>();
754 }
755 
756 TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) {
757  registersToExecutorListTest<CPUThreadPoolExecutor>();
758 }
759 
760 template <typename TPE>
762  auto ntf = std::make_shared<NamedThreadFactory>("my_executor");
763  TPE tpe(10, ntf);
764  EXPECT_EQ("my_executor", tpe.getName());
765 }
766 
767 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) {
768  testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>();
769 }
770 
771 TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) {
772  testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>();
773 }
774 
775 TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
776  boost::barrier barrier{3};
777  auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
779  e.setThreadDeathTimeout(std::chrono::milliseconds(100));
780  e.add(twice_waiting_task);
781  e.add(twice_waiting_task);
782  barrier.wait(); // ensure both tasks are mid-flight
783  EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check";
784 
785  auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; };
786  EXPECT_FALSE(pred()) << "sanity check";
787  barrier.wait(); // let both mid-flight tasks complete
788  EXPECT_EQ(
791  std::chrono::steady_clock::now() + std::chrono::seconds(1), pred));
792 }
793 
794 TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
796  e.setThreadDeathTimeout(std::chrono::milliseconds(0));
797  std::atomic<uint64_t> count{0};
798  for (int i = 0; i < 10000; i++) {
799  Baton<> b;
800  e.add([&]() {
801  count.fetch_add(1, std::memory_order_relaxed);
802  b.post();
803  });
804  b.wait();
805  }
806  e.join();
807  EXPECT_EQ(count, 10000);
808 }
809 
810 TEST(ThreadPoolExecutorTest, AddPerf) {
811  auto queue = std::make_unique<
814  1000,
815  std::move(queue),
816  std::make_shared<NamedThreadFactory>("CPUThreadPool"));
817  e.setThreadDeathTimeout(std::chrono::milliseconds(1));
818  for (int i = 0; i < 10000; i++) {
819  e.add([&]() { e.add([]() { /* sleep override */ usleep(1000); }); });
820  }
821  e.stop();
822 }
823 
824 template <typename TPE>
825 static void WeakRefTest() {
826  // test that adding a .then() after we have
827  // started shutting down does not deadlock
829  int counter{0};
830  {
831  TPE fe(1);
832  f = folly::makeFuture()
833  .via(&fe)
834  .thenValue([](auto&&) { burnMs(100)(); })
835  .thenValue([&](auto&&) { ++counter; })
836  .via(fe.weakRef())
837  .thenValue([](auto&&) { burnMs(100)(); })
838  .thenValue([&](auto&&) { ++counter; });
839  }
841  EXPECT_EQ(1, counter);
842 }
843 
844 template <typename TPE>
845 static void virtualExecutorTest() {
846  using namespace std::literals;
847 
849  int counter{0};
850  {
851  TPE fe(1);
852  {
853  VirtualExecutor ve(fe);
854  f = futures::sleep(100ms)
855  .via(&ve)
856  .thenValue([&](auto&&) {
857  ++counter;
858  return futures::sleep(100ms);
859  })
860  .via(&fe)
861  .thenValue([&](auto&&) { ++counter; })
862  .semi();
863  }
864  EXPECT_EQ(1, counter);
865 
866  bool functionDestroyed{false};
867  bool functionCalled{false};
868  {
869  VirtualExecutor ve(fe);
870  auto guard = makeGuard([&functionDestroyed] {
871  std::this_thread::sleep_for(100ms);
872  functionDestroyed = true;
873  });
874  ve.add([&functionCalled, guard = std::move(guard)] {
875  functionCalled = true;
876  });
877  }
878  EXPECT_TRUE(functionCalled);
879  EXPECT_TRUE(functionDestroyed);
880  }
881  EXPECT_TRUE(f->isReady());
882  EXPECT_NO_THROW(std::move(*f).get());
883  EXPECT_EQ(2, counter);
884 }
885 
886 TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
887  WeakRefTest<IOThreadPoolExecutor>();
888 }
889 
890 TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
891  WeakRefTest<CPUThreadPoolExecutor>();
892 }
893 
894 TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
895  virtualExecutorTest<IOThreadPoolExecutor>();
896 }
897 
898 TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
899  virtualExecutorTest<CPUThreadPoolExecutor>();
900 }
static void taskStats()
bool hasCallback() override
static void testUsesNameFromNamedThreadFactory()
spin_result spin_yield_until(std::chrono::time_point< Clock, Duration > const &deadline, F f)
Definition: Spin.h:70
auto f
static void registersToExecutorListTest()
#define FAIL()
Definition: gtest.h:1822
static void futureExecutor()
#define EXPECT_NO_THROW(statement)
Definition: gtest.h:1845
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
SlowMover(bool slow_=false)
char b
static void resize()
void destroy< IOThreadPoolExecutor >()
void setContextData(const RequestToken &val, std::unique_ptr< RequestData > data)
Definition: Request.cpp:129
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static const int8_t LO_PRI
Definition: Executor.h:48
Future< Unit > sleep(Duration dur, Timekeeper *tk)
Definition: Future.cpp:42
void addObserver(std::shared_ptr< Observer >)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
SlowMover & operator=(SlowMover &&other) noexcept
std::enable_if< folly::isFuture< invoke_result_t< F > >::value, invoke_result_t< F > >::type addFuture(F func)
static void poolStats()
static void basic()
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
in_place_tag in_place(in_place_tag={})
Definition: Utility.h:235
requires E e noexcept(noexcept(s.error(std::move(e))))
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
static void expiration()
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle *) override
static void resizeThreadWhileExecutingTest()
static void destroy()
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:170
void add(Func f) override
Function< void()> Func
Definition: Executor.h:27
static void stop()
SlowMover(SlowMover &&other) noexcept
static void resizeUnderLoad()
LogLevel min
Definition: LogLevel.cpp:30
void add(Func func) override
void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle *) override
void removeObserver(std::shared_ptr< Observer >)
virtual void addWithPriority(Func, int8_t priority)
Definition: Executor.cpp:25
static void withAll(FunctionRef< void(ThreadPoolExecutor &)> f)
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
static void removeThreadTest()
void setThreadDeathTimeout(std::chrono::milliseconds timeout)
void bugD3527722_test()
void setNumThreads(size_t numThreads)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
Definition: Try.h:51
void post() noexcept
Definition: Baton.h:123
int getNumThreadPoolExecutors()
int * count
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
std::thread newThread(Func &&func) override
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
Definition: ScopeGuard.h:184
std::thread newThread(Func &&func) override
static Func burnMs(uint64_t ms)
std::atomic< int > counter
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
void threadStarted(ThreadPoolExecutor::ThreadHandle *) override
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
Definition: Future-inl.h:1290
void join(const Delim &delimiter, Iterator begin, Iterator end, String &output)
Definition: String-inl.h:498
void keepAliveTest()
InlineExecutor exe
Definition: Benchmark.cpp:337
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
Definition: Executor.h:200
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void addWithPriority(Func f, int8_t priority) override
static void virtualExecutorTest()
void throwSystemError(Args &&...args)
Definition: Exception.h:76
static const int8_t HI_PRI
Definition: Executor.h:50
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
static void WeakRefTest()
void threadStopped(ThreadPoolExecutor::ThreadHandle *) override
RequestData * getContextData(const RequestToken &val)
Definition: Request.cpp:151
char c
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static RequestContext * get()
Definition: Request.cpp:290
StringPiece data_
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
void clear() noexcept
Definition: Optional.h:251
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934
void stop< IOThreadPoolExecutor >()