44 bool taskAdded =
false;
45 size_t iterations = 0;
47 FiberManager manager(std::make_unique<SimpleLoopController>());
48 auto& loopController =
52 loopController.setTimeFunc([&] {
return now; });
54 auto loopFunc = [&]() {
59 auto res = baton.
try_wait_for(std::chrono::milliseconds(230));
64 loopController.stop();
69 auto res = baton.
try_wait_for(std::chrono::milliseconds(130));
74 loopController.stop();
78 now += std::chrono::milliseconds(50);
87 bool taskAdded =
false;
88 size_t iterations = 0;
91 FiberManager manager(std::make_unique<SimpleLoopController>());
92 auto& loopController =
95 auto loopFunc = [&]() {
101 auto res = baton.
try_wait_for(std::chrono::milliseconds(130));
106 loopController.stop();
110 std::this_thread::sleep_for(std::chrono::milliseconds(50));
112 if (iterations == 2) {
118 loopController.loop(
std::move(loopFunc));
122 size_t tasksComplete = 0;
126 FiberManager manager(std::make_unique<EventBaseLoopController>());
128 .attachEventBase(evb);
130 auto task = [&](
size_t timeout_ms) {
134 auto res = baton.
try_wait_for(std::chrono::milliseconds(timeout_ms));
140 std::chrono::duration_cast<std::chrono::milliseconds>(finish -
start);
142 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
143 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
145 if (++tasksComplete == 2) {
151 manager.
addTask([&]() { task(500); });
152 manager.
addTask([&]() { task(250); });
161 size_t tasksComplete = 0;
165 FiberManager manager(std::make_unique<EventBaseLoopController>());
167 .attachEventBase(evb);
176 auto res = baton.
try_wait_for(std::chrono::milliseconds(130));
182 std::chrono::duration_cast<std::chrono::milliseconds>(finish -
start);
184 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
186 if (++tasksComplete == 1) {
198 FiberManager manager(std::make_unique<SimpleLoopController>());
207 auto thr = std::thread([&]() {
208 std::this_thread::sleep_for(std::chrono::milliseconds(300));
231 FiberManager manager(std::make_unique<SimpleLoopController>());
234 bool fiberRunning =
false;
240 fiberRunning =
false;
247 auto thr = std::thread([&]() {
248 std::this_thread::sleep_for(std::chrono::milliseconds(300));
252 while (fiberRunning) {
260 FiberManager manager(std::make_unique<SimpleLoopController>());
262 std::atomic<bool> threadWaiting(
false);
264 auto thr = std::thread([&]() {
265 threadWaiting =
true;
267 threadWaiting =
false;
270 while (!threadWaiting) {
272 std::this_thread::sleep_for(std::chrono::milliseconds(300));
278 while (threadWaiting) {
287 std::vector<Promise<int>> pendingFibers;
288 bool taskAdded =
false;
290 FiberManager manager(std::make_unique<SimpleLoopController>());
291 auto& loopController =
294 auto loopFunc = [&]() {
297 std::vector<std::function<std::unique_ptr<int>()>> funcs;
298 for (
int i = 0;
i < 3; ++
i) {
299 funcs.push_back([
i, &pendingFibers]() {
301 pendingFibers.push_back(
std::move(promise));
303 return std::make_unique<int>(
i * 2 + 1);
307 auto iter =
addTasks(funcs.begin(), funcs.end());
310 while (iter.hasNext()) {
311 auto result = iter.awaitNext();
312 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
319 }
else if (pendingFibers.size()) {
320 pendingFibers.back().setValue(0);
321 pendingFibers.pop_back();
323 loopController.stop();
327 loopController.loop(
std::move(loopFunc));
332 struct ExpectedException {};
338 throw ExpectedException();
346 throw ExpectedException();
354 std::vector<Promise<int>> pendingFibers;
355 bool taskAdded =
false;
357 FiberManager manager(std::make_unique<SimpleLoopController>());
358 auto& loopController =
361 auto loopFunc = [&]() {
364 std::vector<std::function<int()>> funcs;
365 for (
size_t i = 0;
i < 3; ++
i) {
366 funcs.push_back([
i, &pendingFibers]() {
368 pendingFibers.push_back(
std::move(promise));
371 throw std::runtime_error(
"Runtime");
377 auto iter =
addTasks(funcs.begin(), funcs.end());
380 while (iter.hasNext()) {
382 int result = iter.awaitNext();
384 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
394 }
else if (pendingFibers.size()) {
395 pendingFibers.back().setValue(0);
396 pendingFibers.pop_back();
398 loopController.stop();
402 loopController.loop(
std::move(loopFunc));
406 std::vector<Promise<int>> pendingFibers;
407 bool taskAdded =
false;
409 FiberManager manager(std::make_unique<SimpleLoopController>());
410 auto& loopController =
413 auto loopFunc = [&]() {
416 std::vector<std::function<void()>> funcs;
417 for (
size_t i = 0;
i < 3; ++
i) {
418 funcs.push_back([&pendingFibers]() {
420 pendingFibers.push_back(
std::move(promise));
425 auto iter =
addTasks(funcs.begin(), funcs.end());
428 while (iter.hasNext()) {
436 }
else if (pendingFibers.size()) {
437 pendingFibers.back().setValue(0);
438 pendingFibers.pop_back();
440 loopController.stop();
444 loopController.loop(
std::move(loopFunc));
448 std::vector<Promise<int>> pendingFibers;
449 bool taskAdded =
false;
451 FiberManager manager(std::make_unique<SimpleLoopController>());
452 auto& loopController =
455 auto loopFunc = [&]() {
458 std::vector<std::function<void()>> funcs;
459 for (
size_t i = 0;
i < 3; ++
i) {
460 funcs.push_back([
i, &pendingFibers]() {
462 pendingFibers.push_back(
std::move(promise));
465 throw std::runtime_error(
"");
470 auto iter =
addTasks(funcs.begin(), funcs.end());
473 while (iter.hasNext()) {
486 }
else if (pendingFibers.size()) {
487 pendingFibers.back().setValue(0);
488 pendingFibers.pop_back();
490 loopController.stop();
494 loopController.loop(
std::move(loopFunc));
498 std::vector<Promise<int>> pendingFibers;
499 bool taskAdded =
false;
501 FiberManager manager(std::make_unique<SimpleLoopController>());
502 auto& loopController =
505 auto loopFunc = [&]() {
508 std::vector<std::function<void()>> funcs;
509 for (
size_t i = 0;
i < 3; ++
i) {
510 funcs.push_back([&pendingFibers]() {
512 pendingFibers.push_back(
std::move(promise));
517 auto iter =
addTasks(funcs.begin(), funcs.end());
540 }
else if (pendingFibers.size()) {
541 pendingFibers.back().setValue(0);
542 pendingFibers.pop_back();
544 loopController.stop();
548 loopController.loop(
std::move(loopFunc));
556 auto makeTask = [&](
size_t taskId) {
557 return [&, taskId]() ->
size_t {
558 batons[taskId].
wait();
588 std::vector<Promise<int>> pendingFibers;
589 bool taskAdded =
false;
591 FiberManager manager(std::make_unique<SimpleLoopController>());
592 auto& loopController =
595 auto loopFunc = [&]() {
598 std::vector<std::function<int()>> funcs;
599 for (
size_t i = 0;
i < 3; ++
i) {
600 funcs.push_back([
i, &pendingFibers]() {
602 pendingFibers.push_back(
std::move(promise));
608 std::vector<std::pair<size_t, int>> results;
609 forEach(funcs.begin(), funcs.end(), [&results](
size_t id,
int result) {
610 results.emplace_back(
id, result);
614 for (
size_t i = 0;
i < 3; ++
i) {
619 }
else if (pendingFibers.size()) {
620 pendingFibers.back().setValue(0);
621 pendingFibers.pop_back();
623 loopController.stop();
627 loopController.loop(
std::move(loopFunc));
631 std::vector<Promise<int>> pendingFibers;
632 bool taskAdded =
false;
634 FiberManager manager(std::make_unique<SimpleLoopController>());
635 auto& loopController =
638 auto loopFunc = [&]() {
641 std::vector<std::function<int()>> funcs;
642 for (
size_t i = 0;
i < 3; ++
i) {
643 funcs.push_back([
i, &pendingFibers]() {
645 pendingFibers.push_back(
std::move(promise));
651 auto results =
collectN(funcs.begin(), funcs.end(), 2);
654 for (
size_t i = 0;
i < 2; ++
i) {
659 }
else if (pendingFibers.size()) {
660 pendingFibers.back().setValue(0);
661 pendingFibers.pop_back();
663 loopController.stop();
667 loopController.loop(
std::move(loopFunc));
671 std::vector<Promise<int>> pendingFibers;
672 bool taskAdded =
false;
674 FiberManager manager(std::make_unique<SimpleLoopController>());
675 auto& loopController =
678 auto loopFunc = [&]() {
681 std::vector<std::function<int()>> funcs;
682 for (
size_t i = 0;
i < 3; ++
i) {
683 funcs.push_back([&pendingFibers]() ->
size_t {
685 pendingFibers.push_back(
std::move(promise));
687 throw std::runtime_error(
"Runtime");
692 collectN(funcs.begin(), funcs.end(), 2);
698 }
else if (pendingFibers.size()) {
699 pendingFibers.back().setValue(0);
700 pendingFibers.pop_back();
702 loopController.stop();
706 loopController.loop(
std::move(loopFunc));
710 std::vector<Promise<int>> pendingFibers;
711 bool taskAdded =
false;
713 FiberManager manager(std::make_unique<SimpleLoopController>());
714 auto& loopController =
717 auto loopFunc = [&]() {
720 std::vector<std::function<void()>> funcs;
721 for (
size_t i = 0;
i < 3; ++
i) {
722 funcs.push_back([&pendingFibers]() {
724 pendingFibers.push_back(
std::move(promise));
729 auto results =
collectN(funcs.begin(), funcs.end(), 2);
734 }
else if (pendingFibers.size()) {
735 pendingFibers.back().setValue(0);
736 pendingFibers.pop_back();
738 loopController.stop();
742 loopController.loop(
std::move(loopFunc));
746 std::vector<Promise<int>> pendingFibers;
747 bool taskAdded =
false;
749 FiberManager manager(std::make_unique<SimpleLoopController>());
750 auto& loopController =
753 auto loopFunc = [&]() {
756 std::vector<std::function<void()>> funcs;
757 for (
size_t i = 0;
i < 3; ++
i) {
758 funcs.push_back([&pendingFibers]() {
760 pendingFibers.push_back(
std::move(promise));
762 throw std::runtime_error(
"Runtime");
767 collectN(funcs.begin(), funcs.end(), 2);
773 }
else if (pendingFibers.size()) {
774 pendingFibers.back().setValue(0);
775 pendingFibers.pop_back();
777 loopController.stop();
781 loopController.loop(
std::move(loopFunc));
785 std::vector<Promise<int>> pendingFibers;
786 bool taskAdded =
false;
788 FiberManager manager(std::make_unique<SimpleLoopController>());
789 auto& loopController =
792 auto loopFunc = [&]() {
795 std::vector<std::function<int()>> funcs;
796 for (
size_t i = 0;
i < 3; ++
i) {
797 funcs.push_back([
i, &pendingFibers]() {
799 pendingFibers.push_back(
std::move(promise));
805 auto results =
collectAll(funcs.begin(), funcs.end());
807 for (
size_t i = 0;
i < 3; ++
i) {
812 }
else if (pendingFibers.size()) {
813 pendingFibers.back().setValue(0);
814 pendingFibers.pop_back();
816 loopController.stop();
820 loopController.loop(
std::move(loopFunc));
824 std::vector<Promise<int>> pendingFibers;
825 bool taskAdded =
false;
827 FiberManager manager(std::make_unique<SimpleLoopController>());
828 auto& loopController =
831 auto loopFunc = [&]() {
834 std::vector<std::function<void()>> funcs;
835 for (
size_t i = 0;
i < 3; ++
i) {
836 funcs.push_back([&pendingFibers]() {
838 pendingFibers.push_back(
std::move(promise));
847 }
else if (pendingFibers.size()) {
848 pendingFibers.back().setValue(0);
849 pendingFibers.pop_back();
851 loopController.stop();
855 loopController.loop(
std::move(loopFunc));
859 std::vector<Promise<int>> pendingFibers;
860 bool taskAdded =
false;
862 FiberManager manager(std::make_unique<SimpleLoopController>());
863 auto& loopController =
866 auto loopFunc = [&]() {
869 std::vector<std::function<int()>> funcs;
870 for (
size_t i = 0;
i < 3; ++
i) {
871 funcs.push_back([
i, &pendingFibers]() {
873 pendingFibers.push_back(
std::move(promise));
876 throw std::runtime_error(
"This exception will be ignored");
882 auto result =
collectAny(funcs.begin(), funcs.end());
888 }
else if (pendingFibers.size()) {
889 pendingFibers.back().setValue(0);
890 pendingFibers.pop_back();
892 loopController.stop();
896 loopController.loop(
std::move(loopFunc));
905 void expectMainContext(
bool& ran,
int* mainLocation,
int* fiberLocation) {
908 constexpr
auto const kHereToFiberMaxDist = 0x2000 /
sizeof(int);
912 constexpr
auto const kHereToMainMaxDist =
916 EXPECT_GT(std::abs(&here - fiberLocation), kHereToFiberMaxDist);
919 EXPECT_LT(std::abs(&here - mainLocation), kHereToMainMaxDist);
928 FiberManager manager(std::make_unique<SimpleLoopController>());
929 auto& loopController =
932 bool checkRan =
false;
936 [&]() { expectMainContext(checkRan, &mainLocation,
nullptr); });
943 explicit A(
int value_) :
value(value_) {}
944 A(
const A&) =
delete;
951 expectMainContext(checkRan, &mainLocation, &stackLocation);
958 loopController.loop([&]() { loopController.stop(); });
964 FiberManager manager(std::make_unique<SimpleLoopController>());
965 auto& loopController =
968 bool checkRan =
false;
973 [&]() {
return 1234; },
977 expectMainContext(checkRan, &mainLocation,
nullptr);
982 loopController.loop([&]() { loopController.stop(); });
991 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
992 auto& loopController =
995 size_t fibersRun = 0;
997 for (
size_t i = 0;
i < 5; ++
i) {
998 manager.
addTask([&]() { ++fibersRun; });
1000 loopController.loop([&]() { loopController.stop(); });
1006 for (
size_t i = 0;
i < 5; ++
i) {
1007 manager.
addTask([&]() { ++fibersRun; });
1009 loopController.loop([&]() { loopController.stop(); });
1020 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1021 auto& loopController =
1024 size_t fibersRun = 0;
1026 for (
size_t i = 0;
i < 10; ++
i) {
1027 manager.
addTask([&]() { ++fibersRun; });
1034 loopController.loop([&]() { loopController.stop(); });
1042 FiberManager manager(std::make_unique<SimpleLoopController>());
1043 auto& loopController =
1047 result[0] = result[1] = 0;
1065 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1066 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1067 remoteThread0.join();
1068 remoteThread1.join();
1072 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1080 FiberManager manager(std::make_unique<SimpleLoopController>());
1083 result[0] = result[1] = 0;
1086 std::thread remoteThread0{[&]() {
1092 std::thread remoteThread1{[&]() {
1098 remoteThread0.join();
1099 remoteThread1.join();
1108 savedPromise[0]->setValue(42);
1109 savedPromise[1]->setValue(43);
1121 FiberManager fm(std::make_unique<SimpleLoopController>());
1137 FiberManager fm(std::make_unique<SimpleLoopController>());
1138 std::thread remote([&]() {
1152 std::thread remote2([&]() { savedPromise->setValue(47); });
1162 template <
typename Data>
1169 local<Data>().
value = 43;
1174 local<Data>().
value = 44;
1183 local<Data>().
value = 43;
1190 local<Data>().
value = 43;
1194 local<Data>().
value = 44;
1196 std::vector<std::function<void()>>
tasks{task};
1211 testFiberLocal<SimpleData>();
1216 char _[1024 * 1024];
1220 testFiberLocal<LargeData>();
1232 local<CrazyData>().
data = 0;
1241 fm.
addTask([]() { local<CrazyData>().
data = 41; });
1248 FiberManager manager(std::make_unique<SimpleLoopController>());
1249 auto& loopController =
1252 bool checkRan =
false;
1259 loopController.loop([&]() {
1261 loopController.stop();
1269 FiberManager fm(std::make_unique<SimpleLoopController>());
1271 bool checkRun1 =
false;
1272 bool checkRun2 =
false;
1273 bool checkRun3 =
false;
1274 bool checkRun4 =
false;
1367 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1371 .attachEventBase(evb);
1373 std::vector<Baton> batons(10);
1375 size_t tasksRun = 0;
1376 for (
size_t i = 0;
i < 30; ++
i) {
1377 manager.
addTask([
i, &batons, &tasksRun]() {
1380 if (
i < batons.size()) {
1396 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1401 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1406 for (
size_t i = 0;
i < batons.size(); ++
i) {
1413 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1418 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1425 FiberManager manager(std::make_unique<EventBaseLoopController>());
1429 .attachEventBase(evb);
1431 size_t fibersRun = 0;
1436 baton.
wait(timeoutHandler);
1445 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1457 FiberManager manager(std::make_unique<EventBaseLoopController>());
1461 .attachEventBase(evb);
1463 constexpr
size_t kNumTimeoutTasks = 10000;
1464 size_t tasksCount = kNumTimeoutTasks;
1467 for (
size_t i = 0;
i < kNumTimeoutTasks; ++
i) {
1476 baton.
wait(timeoutHandler);
1477 if (--tasksCount == 0) {
1487 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1488 auto& loopController =
1493 auto f1 = fiberManager.
addTaskFuture([&]() {
return testValue1; });
1495 loopController.loop([&]() { loopController.stop(); });
1505 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1506 auto& loopController =
1509 bool ranLocal =
false;
1513 bool ranRemote =
false;
1517 loopController.loop([&]() { loopController.stop(); });
1522 futureRemote.
wait();
1555 static constexpr
size_t kTasks = 10;
1556 static constexpr
size_t kIterations = 10000;
1557 static constexpr
size_t kNumTokens = 10;
1564 FiberManager manager(std::make_unique<EventBaseLoopController>());
1567 .attachEventBase(evb);
1570 std::shared_ptr<folly::EventBase> completionCounter(
1573 for (
size_t i = 0;
i < kTasks; ++
i) {
1574 manager.
addTask([&, completionCounter]() {
1575 for (
size_t j = 0; j < kIterations; ++j) {
1594 std::thread threadA([&] { task(counterA, batonA); });
1595 std::thread threadB([&] { task(counterB, batonB); });
1608 template <
typename ExecutorT>
1611 executor, [=](std::vector<int>&&
batch) {
1613 std::vector<std::string> results;
1614 for (
auto& it :
batch) {
1615 results.push_back(folly::to<std::string>(it));
1620 auto indexCopy = index;
1621 auto result = batchDispatcher.
add(
std::move(indexCopy));
1632 for (
int i = 0;
i < batchSize;
i++) {
1642 for (
int i = 0;
i < batchSize;
i++) {
1650 template <
typename ExecutorT>
1653 int totalNumberOfElements,
1654 std::vector<int> input) {
1657 std::vector<std::string>,
1659 batchDispatcher(executor, [=](std::vector<std::vector<int>>&&
batch) {
1660 std::vector<std::vector<std::string>> results;
1661 int numberOfElements = 0;
1663 numberOfElements +=
unit.size();
1664 std::vector<std::string> result;
1665 for (
auto& element :
unit) {
1666 result.push_back(folly::to<std::string>(element));
1670 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1680 template <
typename ExecutorT>
1683 int totalNumberOfElements,
1686 executor, [=, &executor](std::vector<int>&&
batch) {
1688 std::vector<std::string> results;
1689 std::vector<folly::Future<std::vector<std::string>>>
1690 innerDispatchResultFutures;
1692 std::vector<int>
group;
1694 group.push_back(
unit);
1695 if (group.size() == 5) {
1696 auto localGroup =
group;
1700 executor, totalNumberOfElements, localGroup));
1705 innerDispatchResultFutures.begin(),
1706 innerDispatchResultFutures.end())
1708 .then([&](std::vector<
Try<std::vector<std::string>>>
1709 innerDispatchResults) {
1710 for (
auto&
unit : innerDispatchResults) {
1711 for (
auto& element :
unit.value()) {
1712 results.push_back(element);
1720 auto indexCopy = index;
1721 auto result = batchDispatcher.
add(
std::move(indexCopy));
1731 int totalNumberOfElements = 20;
1732 for (
int i = 0;
i < totalNumberOfElements;
i++) {
1741 template <
typename ExecutorT>
1744 executor, [](std::vector<int> &&) -> std::vector<int> {
1745 throw std::runtime_error(
"Surprise!!");
1748 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1757 int totalNumberOfElements = 5;
1758 for (
int i = 0;
i < totalNumberOfElements;
i++) {
1773 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests 1774 #if ENABLE_TRACE_IN_TEST 1775 #define OUTPUT_TRACE std::cerr 1776 #else // ENABLE_TRACE_IN_TEST 1778 template <
typename T>
1787 #define OUTPUT_TRACE devNullPiper 1788 #endif // ENABLE_TRACE_IN_TEST 1797 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1798 double endAfter = start + msecToDoIO;
1799 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1803 throw std::logic_error(
"Simulating preprocessing failure");
1811 Job& operator=(
Job&&) =
default;
1815 return folly::to<ResultT>(
std::move(input));
1819 size_t expectedCount = inputs.size();
1820 std::vector<ResultT> results;
1821 results.reserve(expectedCount);
1822 for (
size_t i = 0;
i < expectedCount; ++
i) {
1830 std::vector<Job>& jobs,
1833 for (
size_t i = 0;
i <
count; ++
i) {
1834 jobs.emplace_back(
Job(atomicBatchDispatcher.
getToken(),
i));
1846 std::vector<Job>& jobs,
1849 size_t problemIndex =
size_t(-1)) {
1851 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1853 results.resize(jobs.size());
1854 for (
size_t i = 0;
i < jobs.size(); ++
i) {
1856 [
i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1860 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1861 if (i == problemIndex) {
1870 OUTPUT_TRACE <<
"Result future filled for job #" << i << std::endl;
1872 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1873 if (i == problemIndex) {
1878 OUTPUT_TRACE <<
"Preprocessing failed for job #" <<
i << std::endl;
1888 OUTPUT_TRACE <<
"results[" << i <<
"].value() : " << results[
i]->value()
1890 }
catch (std::exception& e) {
1891 OUTPUT_TRACE <<
"Exception : " << e.what() << std::endl;
1896 template <
typename TException>
1899 size_t expectedNumResults) {
1900 size_t numResultsFilled = 0;
1901 for (
size_t i = 0;
i < results.size(); ++
i) {
1908 EXPECT_EQ(numResultsFilled, expectedNumResults);
1913 size_t expectedNumResults) {
1914 size_t numResultsFilled = 0;
1915 for (
size_t i = 0;
i < results.size(); ++
i) {
1925 EXPECT_EQ(numResultsFilled, expectedNumResults);
1930 #define SET_UP_TEST_FUNC \ 1931 using namespace AtomicBatchDispatcherTesting; \ 1932 folly::EventBase evb; \ 1933 auto& executor = getFiberManager(evb); \ 1934 const size_t COUNT = 11; \ 1935 std::vector<Job> jobs; \ 1936 jobs.reserve(COUNT); \ 1937 std::vector<folly::Optional<folly::Future<ResultT>>> results; \ 1938 results.reserve(COUNT); \ 1939 DispatchFunctionT dispatchFunc 1948 auto atomicBatchDispatcher =
1950 createJobs(atomicBatchDispatcher, jobs, COUNT);
1952 atomicBatchDispatcher.commit();
1968 auto atomicBatchDispatcher =
1970 createJobs(atomicBatchDispatcher, jobs, COUNT);
1972 throw std::runtime_error(
1973 "Unexpected exception in user code before commit called");
1979 validateResults<ABDCommitNotCalledException>(results, COUNT);
1989 auto atomicBatchDispatcher =
1991 createJobs(atomicBatchDispatcher, jobs, COUNT);
1993 atomicBatchDispatcher.commit();
1995 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
2005 auto atomicBatchDispatcher =
2007 createJobs(atomicBatchDispatcher, jobs, COUNT);
2009 atomicBatchDispatcher.commit();
2020 auto atomicBatchDispatcher =
2022 createJobs(atomicBatchDispatcher, jobs, COUNT);
2023 atomicBatchDispatcher.commit();
2038 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2040 throw std::runtime_error(
"Unexpected exception in user dispatch function");
2042 auto atomicBatchDispatcher =
2044 createJobs(atomicBatchDispatcher, jobs, COUNT);
2046 atomicBatchDispatcher.commit();
2048 validateResults<std::runtime_error>(results, COUNT);
2058 std::make_unique<folly::VirtualEventBase>(*thread.
getEventBase());
2063 baton.try_wait_for(std::chrono::milliseconds{100});
2089 std::thread testThread([&] {
2090 for (
int i = 0;
i < 100;
i++) {
2100 for (
int numFibers = 0; numFibers < 100; numFibers++) {
2102 for (
int i = 0;
i < 20;
i++) {
2128 std::thread unlockThread([&] {
2129 std::this_thread::sleep_for(
2130 std::chrono::milliseconds{100});
2134 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2137 auto locked = mutex.
timed_lock(std::chrono::seconds{1});
2148 unlockThread.join();
2159 auto locked = mutex.
timed_lock(std::chrono::seconds{1});
2168 auto locked = mutex.
timed_lock(std::chrono::seconds{1});
2183 #ifndef FOLLY_SANITIZE_ADDRESS 2184 void expectStackHighWatermark(
size_t minStackSize,
size_t stackHighWatermark) {
2187 EXPECT_LT(minStackSize, stackHighWatermark);
2190 void expectStackHighWatermark(
size_t,
size_t stackHighWatermark) {
2207 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2208 auto& loopController =
2211 static constexpr
size_t n = 1000;
2215 for (
size_t i = 0;
i < n; ++
i) {
2218 for (
size_t i = 0;
i + 1 < n; ++
i) {
2219 s += b[
i] * b[
i + 1];
2225 loopController.loop([&]() { loopController.stop(); });
2228 std::thread(
f).join();
2237 FiberManager fm(std::make_unique<SimpleLoopController>());
2238 auto& loopController =
2241 static constexpr
size_t n = 1000;
2245 for (
size_t i = 0;
i < n; ++
i) {
2248 for (
size_t i = 0;
i + 1 < n; ++
i) {
2249 s += b[
i] * b[
i + 1];
2257 loopController.loop([&]() { loopController.stop(); });
2260 std::thread(
f).join();
EventBase * getEventBase() const
std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, typename std::pair< size_t, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > >::type collectAny(InputIterator first, InputIterator last)
std::vector< typename std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > >::type > collectAll(InputIterator first, InputIterator last)
void validateResult(std::vector< folly::Optional< folly::Future< ResultT >>> &results, size_t i)
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
#define EXPECT_NO_THROW(statement)
#define EXPECT_THROW(statement, expected_exception)
void addTaskRemote(F &&func)
void doubleBatchOuterDispatch(ExecutorT &executor, int totalNumberOfElements, int index)
size_t fibersAllocated() const
std::vector< ResultT > userDispatchFunc(std::vector< ValueT > &&inputs)
#define EXPECT_EQ(val1, val2)
Future< ResultT > add(ValueT value)
AtomicBatchDispatcher< InputT, ResultT > createAtomicBatchDispatcher(folly::Function< std::vector< ResultT >(std::vector< InputT > &&)> dispatchFunc, size_t initialCapacity)
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
std::list< AtForkTask > tasks
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
std::vector< typename std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, typename std::pair< size_t, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > >::type > collectN(InputIterator first, InputIterator last, size_t n)
folly::Future< std::vector< std::string > > doubleBatchInnerDispatch(ExecutorT &executor, int totalNumberOfElements, std::vector< int > input)
invoke_result_t< F > runInMainContext(F &&func)
constexpr bool kIsSanitizeAddress
detail::Batch batch(size_t batchSize)
void addTaskFinally(F &&func, G &&finally)
SemiFuture< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAllSemiFuture(Fs &&...fs)
void add(folly::Func f) override
#define EXPECT_GE(val1, val2)
void addTaskFinally(F &&func, G &&finally)
bool hasActiveFiber() const
DevNullPiper & operator<<(std::ostream &(*)(std::ostream &))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void preprocess(FiberManager &executor, bool die)
auto addTaskFuture(F &&func) -> folly::Future< typename folly::lift_unit< invoke_result_t< F >>::type >
Single-threaded task execution engine.
void scheduleTimeout(TimeoutController::Duration timeoutMs)
void validateResults(std::vector< folly::Optional< folly::Future< ResultT >>> &results, size_t expectedNumResults)
bool loopOnce(int flags=0)
void dispatchJobs(FiberManager &executor, std::vector< Job > &jobs, std::vector< folly::Optional< folly::Future< ResultT >>> &results, DispatchProblem dispatchProblem=DispatchProblem::None, size_t problemIndex=size_t(-1))
uint32_t fibersPoolResizePeriodMs
DevNullPiper & operator<<(const T &)
constexpr auto data(C &c) -> decltype(c.data())
bool runInEventBaseThread(void(*fn)(T *), T *arg)
TaskIterator< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > addTasks(InputIterator first, InputIterator last)
void forEach(InputIterator first, InputIterator last, F &&f)
LoopController & loopController()
Optional< NamedGroup > group
#define EXPECT_TRUE(condition)
invoke_result_t< F > runInMainContext(F &&func)
AtomicBatchDispatcher< ValueT, ResultT >::Token token
std::atomic< int > counter
struct AtomicBatchDispatcherTesting::DevNullPiper devNullPiper
static FiberManager * getFiberManagerUnsafe()
#define EXPECT_NE(val1, val2)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
const internal::AnythingMatcher _
size_t stackHighWatermark() const
void createJobs(AtomicBatchDispatcher< ValueT, ResultT > &atomicBatchDispatcher, std::vector< Job > &jobs, size_t count)
Job(AtomicBatchDispatcher< ValueT, ResultT >::Token &&t, ValueT i)
#define EXPECT_FALSE(condition)
void batchDispatchExceptionHandling(ExecutorT &executor, int i)
bool timed_lock(const std::chrono::duration< Rep, Period > &duration)
#define EXPECT_LT(val1, val2)
ResultT processSingleInput(ValueT &&input)
#define ASSERT_TRUE(condition)
bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static RequestContext * get()
folly::VirtualEventBase & getVirtualEventBase()
void singleBatchDispatch(ExecutorT &executor, int batchSize, int index)
size_t fibersPoolSize() const
constexpr detail::First first
auto addTaskRemoteFuture(F &&func) -> folly::Future< typename folly::lift_unit< invoke_result_t< F >>::type >
FirstArgOf< F >::type::value_type await(F &&func)
#define EXPECT_GT(val1, val2)
FiberManager & getFiberManager(EventBase &evb, const FiberManager::Options &opts)