22 #include <boost/thread.hpp> 29 #if defined(__linux__) 33 using namespace folly;
35 using std::chrono::duration_cast;
36 using std::chrono::microseconds;
37 using std::chrono::milliseconds;
38 using std::chrono::steady_clock;
49 static const auto timeFactor = std::chrono::milliseconds(400);
50 std::chrono::milliseconds testInterval(
int n) {
51 return n * timeFactor;
53 int getTicksWithinRange(
int n,
int min,
int max) {
60 microseconds usec(static_cast<microseconds::rep>(
61 duration_cast<microseconds>(timeFactor).
count() * n));
83 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
95 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
105 fs.
addFunction([&] { total += 1; }, testInterval(2),
"add2");
114 atomic<int> total{0};
119 fs.
addFunction([&] { total += 1; }, testInterval(2),
"add2");
122 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
123 fs.
addFunction([&] { total += 3; }, testInterval(3),
"add3");
140 atomic<int> selfCancelCount{0};
144 if (selfCancelCount > 2) {
156 atomic<int> adderCount{0};
158 auto fn2 = [&] { ++fn2Count; };
161 if (adderCount == 2) {
162 fs.
addFunction(fn2, testInterval(3),
"fn2", testInterval(2));
194 atomic<int> total{0};
196 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
197 fs.
addFunction([&] { total += 3; }, testInterval(3),
"add3");
199 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2"),
200 std::invalid_argument);
217 atomic<int> total{0};
219 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
220 fs.
addFunction([&] { total += 3; }, testInterval(2),
"add3");
224 fs.
addFunction([&] { total += 2; }, testInterval(3),
"add22");
230 atomic<int> total{0};
232 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
245 atomic<int> total{0};
247 fs.
addFunction([&] { total += 2; }, testInterval(3),
"add2");
248 fs.
addFunction([&] { total += 3; }, testInterval(3),
"add3");
266 atomic<int> total{0};
284 boost::barrier barrier_a{2};
285 boost::barrier barrier_b{2};
286 boost::barrier barrier_c{2};
287 boost::barrier barrier_d{2};
293 auto mv = std::make_shared<size_t>();
299 state.barrier_a.wait();
301 state.barrier_b.wait();
303 EXPECT_TRUE(bool(mv)) <<
"bug repro: mv was moved-out";
304 state.barrier_c.wait();
306 state.barrier_d.wait();
315 state.barrier_a.wait();
317 fs.resetFunctionTimer(
"nada");
318 EXPECT_EQ(0, state.count) <<
"sanity check";
319 state.barrier_b.wait();
321 state.barrier_c.wait();
322 EXPECT_EQ(1, state.count) <<
"sanity check";
323 state.barrier_d.wait();
325 EXPECT_EQ(2, state.count) <<
"sanity check";
329 atomic<int> total{0};
333 fs.
addFunction([&] { total += 2; }, testInterval(-1),
"add2"),
334 std::invalid_argument);
348 atomic<int> total{0};
352 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2");
356 auto t = total.load();
364 atomic<int> total{0};
367 fs.
addFunction([&] { total += 2; }, testInterval(1),
"add2");
380 atomic<int> total{0};
382 fs.
addFunction([&] { total += 2; }, testInterval(2),
"add2", testInterval(2));
383 fs.
addFunction([&] { total += 3; }, testInterval(3),
"add3", testInterval(2));
386 [&] { total += 2; }, testInterval(3),
"addX", testInterval(-1)),
387 std::invalid_argument);
411 std::atomic<int> ticks(0);
417 std::this_thread::sleep_for(std::chrono::milliseconds(200));
422 std::this_thread::sleep_for(std::chrono::milliseconds(500));
430 std::atomic<int> ticks(0);
436 std::this_thread::sleep_for(std::chrono::milliseconds(200));
442 std::this_thread::sleep_for(std::chrono::milliseconds(500));
450 atomic<int> total{0};
451 const int kTicks = 2;
452 std::chrono::milliseconds minInterval =
453 testInterval(kTicks) - (timeFactor / 5);
454 std::chrono::milliseconds maxInterval =
455 testInterval(kTicks) + (timeFactor / 5);
461 "UniformDistribution",
462 std::chrono::milliseconds(0));
476 std::atomic<int> ticks(0);
479 std::atomic<long long> epoch(0);
497 std::this_thread::sleep_for(std::chrono::milliseconds(600));
506 std::this_thread::sleep_for(std::chrono::milliseconds(1300));
511 atomic<int> total{0};
512 atomic<int> expectedInterval{0};
513 atomic<int> nextInterval{2};
517 [&expectedInterval, &nextInterval]()
mutable {
518 auto interval = nextInterval.load();
519 expectedInterval = interval;
520 nextInterval = interval * interval;
521 return testInterval(interval);
523 "ExponentialBackoff",
525 std::chrono::milliseconds(0));
529 delay(expectedInterval);
531 delay(expectedInterval);
539 atomic<int> total{0};
540 atomic<int> expectedInterval{0};
546 std::gamma_distribution<double> gamma(2.0, 2.0);
549 [&expectedInterval,
generator, gamma]()
mutable {
551 getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
552 return testInterval(expectedInterval);
555 "gamma(2.0,2.0)*100ms",
556 std::chrono::milliseconds(0));
560 delay(expectedInterval);
562 delay(expectedInterval);
570 atomic<int> total{0};
589 atomic<int> total{0};
610 #if defined(__linux__) 616 class PThreadCreateFailure {
618 PThreadCreateFailure() {
621 ~PThreadCreateFailure() {
625 static bool shouldFail() {
626 return forceFailure_ > 0;
630 static std::atomic<int> forceFailure_;
633 std::atomic<int> PThreadCreateFailure::forceFailure_{0};
638 extern "C" int pthread_create(
640 const pthread_attr_t* attr,
641 void* (*start_routine)(
void*),
643 static const auto realFunction =
reinterpret_cast<decltype(&pthread_create)
>(
644 dlsym(RTLD_NEXT,
"pthread_create"));
647 CHECK_NE(realFunction, pthread_create);
649 if (PThreadCreateFailure::shouldFail()) {
653 return realFunction(thread, attr, start_routine, arg);
658 PThreadCreateFailure
fail;
665 atomic<int> total{0};
689 std::thread th([&baton]() {
691 fs.
addFunction([] { delay(10); }, testInterval(2),
"func");
704 std::thread th([&baton]() {
706 fs.
addFunction([] { delay(10); }, testInterval(2),
"func");
719 std::thread th([&baton]() {
720 std::atomic<int> nExecuted(0);
750 fs.
addFunction([] { delay(10); }, testInterval(2),
"func");
#define EXPECT_LE(val1, val2)
void addFunctionOnce(Function< void()> &&cb, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void addFunction(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
#define EXPECT_ANY_THROW(statement)
#define EXPECT_NO_THROW(statement)
#define EXPECT_THROW(statement, expected_exception)
std::default_random_engine generator
#define EXPECT_EQ(val1, val2)
std::chrono::steady_clock::time_point now()
—— Concurrent Priority Queue Implementation ——
bool resetFunctionTimer(StringPiece nameID)
FOLLY_ALWAYS_INLINE bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout, const WaitOptions &opt=wait_options()) noexcept
bool cancelFunction(StringPiece nameID)
State
See Core for details.
void cancelAllFunctionsAndWait()
#define EXPECT_NEAR(val1, val2, abs_error)
#define EXPECT_TRUE(condition)
void addFunctionUniformDistribution(Function< void()> &&cb, std::chrono::milliseconds minInterval, std::chrono::milliseconds maxInterval, StringPiece nameID, std::chrono::milliseconds startDelay)
bool cancelFunctionAndWait(StringPiece nameID)
void addFunctionConsistentDelay(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
#define EXPECT_FALSE(condition)
void addFunctionGenericDistribution(Function< void()> &&cb, IntervalDistributionFunc &&intervalFunc, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)
#define ASSERT_TRUE(condition)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void setSteady(bool steady)