proxygen
FibersTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <atomic>
17 #include <thread>
18 #include <vector>
19 
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23 
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
32 #include <folly/fibers/Semaphore.h>
35 #include <folly/fibers/WhenN.h>
38 
39 using namespace folly::fibers;
40 
41 using folly::Try;
42 
43 TEST(FiberManager, batonTimedWaitTimeout) {
44  bool taskAdded = false;
45  size_t iterations = 0;
46 
47  FiberManager manager(std::make_unique<SimpleLoopController>());
48  auto& loopController =
49  dynamic_cast<SimpleLoopController&>(manager.loopController());
50 
52  loopController.setTimeFunc([&] { return now; });
53 
54  auto loopFunc = [&]() {
55  if (!taskAdded) {
56  manager.addTask([&]() {
57  Baton baton;
58 
59  auto res = baton.try_wait_for(std::chrono::milliseconds(230));
60 
61  EXPECT_FALSE(res);
62  EXPECT_EQ(5, iterations);
63 
64  loopController.stop();
65  });
66  manager.addTask([&]() {
67  Baton baton;
68 
69  auto res = baton.try_wait_for(std::chrono::milliseconds(130));
70 
71  EXPECT_FALSE(res);
72  EXPECT_EQ(3, iterations);
73 
74  loopController.stop();
75  });
76  taskAdded = true;
77  } else {
78  now += std::chrono::milliseconds(50);
79  iterations++;
80  }
81  };
82 
83  loopController.loop(std::move(loopFunc));
84 }
85 
86 TEST(FiberManager, batonTimedWaitPost) {
87  bool taskAdded = false;
88  size_t iterations = 0;
89  Baton* baton_ptr;
90 
91  FiberManager manager(std::make_unique<SimpleLoopController>());
92  auto& loopController =
93  dynamic_cast<SimpleLoopController&>(manager.loopController());
94 
95  auto loopFunc = [&]() {
96  if (!taskAdded) {
97  manager.addTask([&]() {
98  Baton baton;
99  baton_ptr = &baton;
100 
101  auto res = baton.try_wait_for(std::chrono::milliseconds(130));
102 
103  EXPECT_TRUE(res);
104  EXPECT_EQ(2, iterations);
105 
106  loopController.stop();
107  });
108  taskAdded = true;
109  } else {
110  std::this_thread::sleep_for(std::chrono::milliseconds(50));
111  iterations++;
112  if (iterations == 2) {
113  baton_ptr->post();
114  }
115  }
116  };
117 
118  loopController.loop(std::move(loopFunc));
119 }
120 
121 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
122  size_t tasksComplete = 0;
123 
124  folly::EventBase evb;
125 
126  FiberManager manager(std::make_unique<EventBaseLoopController>());
127  dynamic_cast<EventBaseLoopController&>(manager.loopController())
128  .attachEventBase(evb);
129 
130  auto task = [&](size_t timeout_ms) {
131  Baton baton;
132 
134  auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
135  auto finish = EventBaseLoopController::Clock::now();
136 
137  EXPECT_FALSE(res);
138 
139  auto duration_ms =
140  std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
141 
142  EXPECT_GT(duration_ms.count(), timeout_ms - 50);
143  EXPECT_LT(duration_ms.count(), timeout_ms + 50);
144 
145  if (++tasksComplete == 2) {
146  evb.terminateLoopSoon();
147  }
148  };
149 
150  evb.runInEventBaseThread([&]() {
151  manager.addTask([&]() { task(500); });
152  manager.addTask([&]() { task(250); });
153  });
154 
155  evb.loopForever();
156 
157  EXPECT_EQ(2, tasksComplete);
158 }
159 
160 TEST(FiberManager, batonTimedWaitPostEvb) {
161  size_t tasksComplete = 0;
162 
163  folly::EventBase evb;
164 
165  FiberManager manager(std::make_unique<EventBaseLoopController>());
166  dynamic_cast<EventBaseLoopController&>(manager.loopController())
167  .attachEventBase(evb);
168 
169  evb.runInEventBaseThread([&]() {
170  manager.addTask([&]() {
171  Baton baton;
172 
173  evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
174 
176  auto res = baton.try_wait_for(std::chrono::milliseconds(130));
177  auto finish = EventBaseLoopController::Clock::now();
178 
179  EXPECT_TRUE(res);
180 
181  auto duration_ms =
182  std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
183 
184  EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
185 
186  if (++tasksComplete == 1) {
187  evb.terminateLoopSoon();
188  }
189  });
190  });
191 
192  evb.loopForever();
193 
194  EXPECT_EQ(1, tasksComplete);
195 }
196 
197 TEST(FiberManager, batonTryWait) {
198  FiberManager manager(std::make_unique<SimpleLoopController>());
199 
200  // Check if try_wait and post work as expected
201  Baton b;
202 
203  manager.addTask([&]() {
204  while (!b.try_wait()) {
205  }
206  });
207  auto thr = std::thread([&]() {
208  std::this_thread::sleep_for(std::chrono::milliseconds(300));
209  b.post();
210  });
211 
212  manager.loopUntilNoReady();
213  thr.join();
214 
215  Baton c;
216 
217  // Check try_wait without post
218  manager.addTask([&]() {
219  int cnt = 100;
220  while (cnt && !c.try_wait()) {
221  cnt--;
222  }
223  EXPECT_TRUE(!c.try_wait()); // must still hold
224  EXPECT_EQ(cnt, 0);
225  });
226 
227  manager.loopUntilNoReady();
228 }
229 
230 TEST(FiberManager, genericBatonFiberWait) {
231  FiberManager manager(std::make_unique<SimpleLoopController>());
232 
233  GenericBaton b;
234  bool fiberRunning = false;
235 
236  manager.addTask([&]() {
237  EXPECT_EQ(manager.hasActiveFiber(), true);
238  fiberRunning = true;
239  b.wait();
240  fiberRunning = false;
241  });
242 
243  EXPECT_FALSE(fiberRunning);
244  manager.loopUntilNoReady();
245  EXPECT_TRUE(fiberRunning); // ensure fiber still active
246 
247  auto thr = std::thread([&]() {
248  std::this_thread::sleep_for(std::chrono::milliseconds(300));
249  b.post();
250  });
251 
252  while (fiberRunning) {
253  manager.loopUntilNoReady();
254  }
255 
256  thr.join();
257 }
258 
259 TEST(FiberManager, genericBatonThreadWait) {
260  FiberManager manager(std::make_unique<SimpleLoopController>());
261  GenericBaton b;
262  std::atomic<bool> threadWaiting(false);
263 
264  auto thr = std::thread([&]() {
265  threadWaiting = true;
266  b.wait();
267  threadWaiting = false;
268  });
269 
270  while (!threadWaiting) {
271  }
272  std::this_thread::sleep_for(std::chrono::milliseconds(300));
273 
274  manager.addTask([&]() {
275  EXPECT_EQ(manager.hasActiveFiber(), true);
276  EXPECT_TRUE(threadWaiting);
277  b.post();
278  while (threadWaiting) {
279  }
280  });
281 
282  manager.loopUntilNoReady();
283  thr.join();
284 }
285 
286 TEST(FiberManager, addTasksNoncopyable) {
287  std::vector<Promise<int>> pendingFibers;
288  bool taskAdded = false;
289 
290  FiberManager manager(std::make_unique<SimpleLoopController>());
291  auto& loopController =
292  dynamic_cast<SimpleLoopController&>(manager.loopController());
293 
294  auto loopFunc = [&]() {
295  if (!taskAdded) {
296  manager.addTask([&]() {
297  std::vector<std::function<std::unique_ptr<int>()>> funcs;
298  for (int i = 0; i < 3; ++i) {
299  funcs.push_back([i, &pendingFibers]() {
300  await([&pendingFibers](Promise<int> promise) {
301  pendingFibers.push_back(std::move(promise));
302  });
303  return std::make_unique<int>(i * 2 + 1);
304  });
305  }
306 
307  auto iter = addTasks(funcs.begin(), funcs.end());
308 
309  size_t n = 0;
310  while (iter.hasNext()) {
311  auto result = iter.awaitNext();
312  EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
313  EXPECT_GE(2 - n, pendingFibers.size());
314  ++n;
315  }
316  EXPECT_EQ(3, n);
317  });
318  taskAdded = true;
319  } else if (pendingFibers.size()) {
320  pendingFibers.back().setValue(0);
321  pendingFibers.pop_back();
322  } else {
323  loopController.stop();
324  }
325  };
326 
327  loopController.loop(std::move(loopFunc));
328 }
329 
330 TEST(FiberManager, awaitThrow) {
331  folly::EventBase evb;
332  struct ExpectedException {};
333  getFiberManager(evb)
334  .addTaskFuture([&] {
335  EXPECT_THROW(
336  await([](Promise<int> p) {
337  p.setValue(42);
338  throw ExpectedException();
339  }),
340  ExpectedException);
341 
342  EXPECT_THROW(
343  await([&](Promise<int> p) {
345  [p = std::move(p)]() mutable { p.setValue(42); });
346  throw ExpectedException();
347  }),
348  ExpectedException);
349  })
350  .waitVia(&evb);
351 }
352 
353 TEST(FiberManager, addTasksThrow) {
354  std::vector<Promise<int>> pendingFibers;
355  bool taskAdded = false;
356 
357  FiberManager manager(std::make_unique<SimpleLoopController>());
358  auto& loopController =
359  dynamic_cast<SimpleLoopController&>(manager.loopController());
360 
361  auto loopFunc = [&]() {
362  if (!taskAdded) {
363  manager.addTask([&]() {
364  std::vector<std::function<int()>> funcs;
365  for (size_t i = 0; i < 3; ++i) {
366  funcs.push_back([i, &pendingFibers]() {
367  await([&pendingFibers](Promise<int> promise) {
368  pendingFibers.push_back(std::move(promise));
369  });
370  if (i % 2 == 0) {
371  throw std::runtime_error("Runtime");
372  }
373  return i * 2 + 1;
374  });
375  }
376 
377  auto iter = addTasks(funcs.begin(), funcs.end());
378 
379  size_t n = 0;
380  while (iter.hasNext()) {
381  try {
382  int result = iter.awaitNext();
383  EXPECT_EQ(1, iter.getTaskID() % 2);
384  EXPECT_EQ(2 * iter.getTaskID() + 1, result);
385  } catch (...) {
386  EXPECT_EQ(0, iter.getTaskID() % 2);
387  }
388  EXPECT_GE(2 - n, pendingFibers.size());
389  ++n;
390  }
391  EXPECT_EQ(3, n);
392  });
393  taskAdded = true;
394  } else if (pendingFibers.size()) {
395  pendingFibers.back().setValue(0);
396  pendingFibers.pop_back();
397  } else {
398  loopController.stop();
399  }
400  };
401 
402  loopController.loop(std::move(loopFunc));
403 }
404 
405 TEST(FiberManager, addTasksVoid) {
406  std::vector<Promise<int>> pendingFibers;
407  bool taskAdded = false;
408 
409  FiberManager manager(std::make_unique<SimpleLoopController>());
410  auto& loopController =
411  dynamic_cast<SimpleLoopController&>(manager.loopController());
412 
413  auto loopFunc = [&]() {
414  if (!taskAdded) {
415  manager.addTask([&]() {
416  std::vector<std::function<void()>> funcs;
417  for (size_t i = 0; i < 3; ++i) {
418  funcs.push_back([&pendingFibers]() {
419  await([&pendingFibers](Promise<int> promise) {
420  pendingFibers.push_back(std::move(promise));
421  });
422  });
423  }
424 
425  auto iter = addTasks(funcs.begin(), funcs.end());
426 
427  size_t n = 0;
428  while (iter.hasNext()) {
429  iter.awaitNext();
430  EXPECT_GE(2 - n, pendingFibers.size());
431  ++n;
432  }
433  EXPECT_EQ(3, n);
434  });
435  taskAdded = true;
436  } else if (pendingFibers.size()) {
437  pendingFibers.back().setValue(0);
438  pendingFibers.pop_back();
439  } else {
440  loopController.stop();
441  }
442  };
443 
444  loopController.loop(std::move(loopFunc));
445 }
446 
447 TEST(FiberManager, addTasksVoidThrow) {
448  std::vector<Promise<int>> pendingFibers;
449  bool taskAdded = false;
450 
451  FiberManager manager(std::make_unique<SimpleLoopController>());
452  auto& loopController =
453  dynamic_cast<SimpleLoopController&>(manager.loopController());
454 
455  auto loopFunc = [&]() {
456  if (!taskAdded) {
457  manager.addTask([&]() {
458  std::vector<std::function<void()>> funcs;
459  for (size_t i = 0; i < 3; ++i) {
460  funcs.push_back([i, &pendingFibers]() {
461  await([&pendingFibers](Promise<int> promise) {
462  pendingFibers.push_back(std::move(promise));
463  });
464  if (i % 2 == 0) {
465  throw std::runtime_error("");
466  }
467  });
468  }
469 
470  auto iter = addTasks(funcs.begin(), funcs.end());
471 
472  size_t n = 0;
473  while (iter.hasNext()) {
474  try {
475  iter.awaitNext();
476  EXPECT_EQ(1, iter.getTaskID() % 2);
477  } catch (...) {
478  EXPECT_EQ(0, iter.getTaskID() % 2);
479  }
480  EXPECT_GE(2 - n, pendingFibers.size());
481  ++n;
482  }
483  EXPECT_EQ(3, n);
484  });
485  taskAdded = true;
486  } else if (pendingFibers.size()) {
487  pendingFibers.back().setValue(0);
488  pendingFibers.pop_back();
489  } else {
490  loopController.stop();
491  }
492  };
493 
494  loopController.loop(std::move(loopFunc));
495 }
496 
497 TEST(FiberManager, addTasksReserve) {
498  std::vector<Promise<int>> pendingFibers;
499  bool taskAdded = false;
500 
501  FiberManager manager(std::make_unique<SimpleLoopController>());
502  auto& loopController =
503  dynamic_cast<SimpleLoopController&>(manager.loopController());
504 
505  auto loopFunc = [&]() {
506  if (!taskAdded) {
507  manager.addTask([&]() {
508  std::vector<std::function<void()>> funcs;
509  for (size_t i = 0; i < 3; ++i) {
510  funcs.push_back([&pendingFibers]() {
511  await([&pendingFibers](Promise<int> promise) {
512  pendingFibers.push_back(std::move(promise));
513  });
514  });
515  }
516 
517  auto iter = addTasks(funcs.begin(), funcs.end());
518 
519  iter.reserve(2);
520  EXPECT_TRUE(iter.hasCompleted());
521  EXPECT_TRUE(iter.hasPending());
522  EXPECT_TRUE(iter.hasNext());
523 
524  iter.awaitNext();
525  EXPECT_TRUE(iter.hasCompleted());
526  EXPECT_TRUE(iter.hasPending());
527  EXPECT_TRUE(iter.hasNext());
528 
529  iter.awaitNext();
530  EXPECT_FALSE(iter.hasCompleted());
531  EXPECT_TRUE(iter.hasPending());
532  EXPECT_TRUE(iter.hasNext());
533 
534  iter.awaitNext();
535  EXPECT_FALSE(iter.hasCompleted());
536  EXPECT_FALSE(iter.hasPending());
537  EXPECT_FALSE(iter.hasNext());
538  });
539  taskAdded = true;
540  } else if (pendingFibers.size()) {
541  pendingFibers.back().setValue(0);
542  pendingFibers.pop_back();
543  } else {
544  loopController.stop();
545  }
546  };
547 
548  loopController.loop(std::move(loopFunc));
549 }
550 
551 TEST(FiberManager, addTaskDynamic) {
552  folly::EventBase evb;
553 
554  Baton batons[3];
555 
556  auto makeTask = [&](size_t taskId) {
557  return [&, taskId]() -> size_t {
558  batons[taskId].wait();
559  return taskId;
560  };
561  };
562 
563  getFiberManager(evb)
564  .addTaskFuture([&]() {
565  TaskIterator<size_t> iterator;
566 
567  iterator.addTask(makeTask(0));
568  iterator.addTask(makeTask(1));
569 
570  batons[1].post();
571 
572  EXPECT_EQ(1, iterator.awaitNext());
573 
574  iterator.addTask(makeTask(2));
575 
576  batons[2].post();
577 
578  EXPECT_EQ(2, iterator.awaitNext());
579 
580  batons[0].post();
581 
582  EXPECT_EQ(0, iterator.awaitNext());
583  })
584  .waitVia(&evb);
585 }
586 
588  std::vector<Promise<int>> pendingFibers;
589  bool taskAdded = false;
590 
591  FiberManager manager(std::make_unique<SimpleLoopController>());
592  auto& loopController =
593  dynamic_cast<SimpleLoopController&>(manager.loopController());
594 
595  auto loopFunc = [&]() {
596  if (!taskAdded) {
597  manager.addTask([&]() {
598  std::vector<std::function<int()>> funcs;
599  for (size_t i = 0; i < 3; ++i) {
600  funcs.push_back([i, &pendingFibers]() {
601  await([&pendingFibers](Promise<int> promise) {
602  pendingFibers.push_back(std::move(promise));
603  });
604  return i * 2 + 1;
605  });
606  }
607 
608  std::vector<std::pair<size_t, int>> results;
609  forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
610  results.emplace_back(id, result);
611  });
612  EXPECT_EQ(3, results.size());
613  EXPECT_TRUE(pendingFibers.empty());
614  for (size_t i = 0; i < 3; ++i) {
615  EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
616  }
617  });
618  taskAdded = true;
619  } else if (pendingFibers.size()) {
620  pendingFibers.back().setValue(0);
621  pendingFibers.pop_back();
622  } else {
623  loopController.stop();
624  }
625  };
626 
627  loopController.loop(std::move(loopFunc));
628 }
629 
631  std::vector<Promise<int>> pendingFibers;
632  bool taskAdded = false;
633 
634  FiberManager manager(std::make_unique<SimpleLoopController>());
635  auto& loopController =
636  dynamic_cast<SimpleLoopController&>(manager.loopController());
637 
638  auto loopFunc = [&]() {
639  if (!taskAdded) {
640  manager.addTask([&]() {
641  std::vector<std::function<int()>> funcs;
642  for (size_t i = 0; i < 3; ++i) {
643  funcs.push_back([i, &pendingFibers]() {
644  await([&pendingFibers](Promise<int> promise) {
645  pendingFibers.push_back(std::move(promise));
646  });
647  return i * 2 + 1;
648  });
649  }
650 
651  auto results = collectN(funcs.begin(), funcs.end(), 2);
652  EXPECT_EQ(2, results.size());
653  EXPECT_EQ(1, pendingFibers.size());
654  for (size_t i = 0; i < 2; ++i) {
655  EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
656  }
657  });
658  taskAdded = true;
659  } else if (pendingFibers.size()) {
660  pendingFibers.back().setValue(0);
661  pendingFibers.pop_back();
662  } else {
663  loopController.stop();
664  }
665  };
666 
667  loopController.loop(std::move(loopFunc));
668 }
669 
670 TEST(FiberManager, collectNThrow) {
671  std::vector<Promise<int>> pendingFibers;
672  bool taskAdded = false;
673 
674  FiberManager manager(std::make_unique<SimpleLoopController>());
675  auto& loopController =
676  dynamic_cast<SimpleLoopController&>(manager.loopController());
677 
678  auto loopFunc = [&]() {
679  if (!taskAdded) {
680  manager.addTask([&]() {
681  std::vector<std::function<int()>> funcs;
682  for (size_t i = 0; i < 3; ++i) {
683  funcs.push_back([&pendingFibers]() -> size_t {
684  await([&pendingFibers](Promise<int> promise) {
685  pendingFibers.push_back(std::move(promise));
686  });
687  throw std::runtime_error("Runtime");
688  });
689  }
690 
691  try {
692  collectN(funcs.begin(), funcs.end(), 2);
693  } catch (...) {
694  EXPECT_EQ(1, pendingFibers.size());
695  }
696  });
697  taskAdded = true;
698  } else if (pendingFibers.size()) {
699  pendingFibers.back().setValue(0);
700  pendingFibers.pop_back();
701  } else {
702  loopController.stop();
703  }
704  };
705 
706  loopController.loop(std::move(loopFunc));
707 }
708 
709 TEST(FiberManager, collectNVoid) {
710  std::vector<Promise<int>> pendingFibers;
711  bool taskAdded = false;
712 
713  FiberManager manager(std::make_unique<SimpleLoopController>());
714  auto& loopController =
715  dynamic_cast<SimpleLoopController&>(manager.loopController());
716 
717  auto loopFunc = [&]() {
718  if (!taskAdded) {
719  manager.addTask([&]() {
720  std::vector<std::function<void()>> funcs;
721  for (size_t i = 0; i < 3; ++i) {
722  funcs.push_back([&pendingFibers]() {
723  await([&pendingFibers](Promise<int> promise) {
724  pendingFibers.push_back(std::move(promise));
725  });
726  });
727  }
728 
729  auto results = collectN(funcs.begin(), funcs.end(), 2);
730  EXPECT_EQ(2, results.size());
731  EXPECT_EQ(1, pendingFibers.size());
732  });
733  taskAdded = true;
734  } else if (pendingFibers.size()) {
735  pendingFibers.back().setValue(0);
736  pendingFibers.pop_back();
737  } else {
738  loopController.stop();
739  }
740  };
741 
742  loopController.loop(std::move(loopFunc));
743 }
744 
745 TEST(FiberManager, collectNVoidThrow) {
746  std::vector<Promise<int>> pendingFibers;
747  bool taskAdded = false;
748 
749  FiberManager manager(std::make_unique<SimpleLoopController>());
750  auto& loopController =
751  dynamic_cast<SimpleLoopController&>(manager.loopController());
752 
753  auto loopFunc = [&]() {
754  if (!taskAdded) {
755  manager.addTask([&]() {
756  std::vector<std::function<void()>> funcs;
757  for (size_t i = 0; i < 3; ++i) {
758  funcs.push_back([&pendingFibers]() {
759  await([&pendingFibers](Promise<int> promise) {
760  pendingFibers.push_back(std::move(promise));
761  });
762  throw std::runtime_error("Runtime");
763  });
764  }
765 
766  try {
767  collectN(funcs.begin(), funcs.end(), 2);
768  } catch (...) {
769  EXPECT_EQ(1, pendingFibers.size());
770  }
771  });
772  taskAdded = true;
773  } else if (pendingFibers.size()) {
774  pendingFibers.back().setValue(0);
775  pendingFibers.pop_back();
776  } else {
777  loopController.stop();
778  }
779  };
780 
781  loopController.loop(std::move(loopFunc));
782 }
783 
785  std::vector<Promise<int>> pendingFibers;
786  bool taskAdded = false;
787 
788  FiberManager manager(std::make_unique<SimpleLoopController>());
789  auto& loopController =
790  dynamic_cast<SimpleLoopController&>(manager.loopController());
791 
792  auto loopFunc = [&]() {
793  if (!taskAdded) {
794  manager.addTask([&]() {
795  std::vector<std::function<int()>> funcs;
796  for (size_t i = 0; i < 3; ++i) {
797  funcs.push_back([i, &pendingFibers]() {
798  await([&pendingFibers](Promise<int> promise) {
799  pendingFibers.push_back(std::move(promise));
800  });
801  return i * 2 + 1;
802  });
803  }
804 
805  auto results = collectAll(funcs.begin(), funcs.end());
806  EXPECT_TRUE(pendingFibers.empty());
807  for (size_t i = 0; i < 3; ++i) {
808  EXPECT_EQ(i * 2 + 1, results[i]);
809  }
810  });
811  taskAdded = true;
812  } else if (pendingFibers.size()) {
813  pendingFibers.back().setValue(0);
814  pendingFibers.pop_back();
815  } else {
816  loopController.stop();
817  }
818  };
819 
820  loopController.loop(std::move(loopFunc));
821 }
822 
823 TEST(FiberManager, collectAllVoid) {
824  std::vector<Promise<int>> pendingFibers;
825  bool taskAdded = false;
826 
827  FiberManager manager(std::make_unique<SimpleLoopController>());
828  auto& loopController =
829  dynamic_cast<SimpleLoopController&>(manager.loopController());
830 
831  auto loopFunc = [&]() {
832  if (!taskAdded) {
833  manager.addTask([&]() {
834  std::vector<std::function<void()>> funcs;
835  for (size_t i = 0; i < 3; ++i) {
836  funcs.push_back([&pendingFibers]() {
837  await([&pendingFibers](Promise<int> promise) {
838  pendingFibers.push_back(std::move(promise));
839  });
840  });
841  }
842 
843  collectAll(funcs.begin(), funcs.end());
844  EXPECT_TRUE(pendingFibers.empty());
845  });
846  taskAdded = true;
847  } else if (pendingFibers.size()) {
848  pendingFibers.back().setValue(0);
849  pendingFibers.pop_back();
850  } else {
851  loopController.stop();
852  }
853  };
854 
855  loopController.loop(std::move(loopFunc));
856 }
857 
859  std::vector<Promise<int>> pendingFibers;
860  bool taskAdded = false;
861 
862  FiberManager manager(std::make_unique<SimpleLoopController>());
863  auto& loopController =
864  dynamic_cast<SimpleLoopController&>(manager.loopController());
865 
866  auto loopFunc = [&]() {
867  if (!taskAdded) {
868  manager.addTask([&]() {
869  std::vector<std::function<int()>> funcs;
870  for (size_t i = 0; i < 3; ++i) {
871  funcs.push_back([i, &pendingFibers]() {
872  await([&pendingFibers](Promise<int> promise) {
873  pendingFibers.push_back(std::move(promise));
874  });
875  if (i == 1) {
876  throw std::runtime_error("This exception will be ignored");
877  }
878  return i * 2 + 1;
879  });
880  }
881 
882  auto result = collectAny(funcs.begin(), funcs.end());
883  EXPECT_EQ(2, pendingFibers.size());
884  EXPECT_EQ(2, result.first);
885  EXPECT_EQ(2 * 2 + 1, result.second);
886  });
887  taskAdded = true;
888  } else if (pendingFibers.size()) {
889  pendingFibers.back().setValue(0);
890  pendingFibers.pop_back();
891  } else {
892  loopController.stop();
893  }
894  };
895 
896  loopController.loop(std::move(loopFunc));
897 }
898 
899 namespace {
900 /* Checks that this function was run from a main context,
901  by comparing an address on a stack to a known main stack address
902  and a known related fiber stack address. The assumption
903  is that fiber stack and main stack will be far enough apart,
904  while any two values on the same stack will be close. */
905 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906  int here;
907  /* 2 pages is a good guess */
908  constexpr auto const kHereToFiberMaxDist = 0x2000 / sizeof(int);
909 
910  // With ASAN's detect_stack_use_after_return=1, this must be much larger
911  // I measured 410028 on x86_64, so allow for quadruple that, just in case.
912  constexpr auto const kHereToMainMaxDist =
913  folly::kIsSanitizeAddress ? 4 * 410028 : kHereToFiberMaxDist;
914 
915  if (fiberLocation) {
916  EXPECT_GT(std::abs(&here - fiberLocation), kHereToFiberMaxDist);
917  }
918  if (mainLocation) {
919  EXPECT_LT(std::abs(&here - mainLocation), kHereToMainMaxDist);
920  }
921 
922  EXPECT_FALSE(ran);
923  ran = true;
924 }
925 } // namespace
926 
928  FiberManager manager(std::make_unique<SimpleLoopController>());
929  auto& loopController =
930  dynamic_cast<SimpleLoopController&>(manager.loopController());
931 
932  bool checkRan = false;
933 
934  int mainLocation;
935  manager.runInMainContext(
936  [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
937  EXPECT_TRUE(checkRan);
938 
939  checkRan = false;
940 
941  manager.addTask([&]() {
942  struct A {
943  explicit A(int value_) : value(value_) {}
944  A(const A&) = delete;
945  A(A&&) = default;
946 
947  int value;
948  };
949  int stackLocation;
950  auto ret = runInMainContext([&]() {
951  expectMainContext(checkRan, &mainLocation, &stackLocation);
952  return A(42);
953  });
954  EXPECT_TRUE(checkRan);
955  EXPECT_EQ(42, ret.value);
956  });
957 
958  loopController.loop([&]() { loopController.stop(); });
959 
960  EXPECT_TRUE(checkRan);
961 }
962 
964  FiberManager manager(std::make_unique<SimpleLoopController>());
965  auto& loopController =
966  dynamic_cast<SimpleLoopController&>(manager.loopController());
967 
968  bool checkRan = false;
969 
970  int mainLocation;
971 
972  manager.addTaskFinally(
973  [&]() { return 1234; },
974  [&](Try<int>&& result) {
975  EXPECT_EQ(result.value(), 1234);
976 
977  expectMainContext(checkRan, &mainLocation, nullptr);
978  });
979 
980  EXPECT_FALSE(checkRan);
981 
982  loopController.loop([&]() { loopController.stop(); });
983 
984  EXPECT_TRUE(checkRan);
985 }
986 
987 TEST(FiberManager, fibersPoolWithinLimit) {
989  opts.maxFibersPoolSize = 5;
990 
991  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
992  auto& loopController =
993  dynamic_cast<SimpleLoopController&>(manager.loopController());
994 
995  size_t fibersRun = 0;
996 
997  for (size_t i = 0; i < 5; ++i) {
998  manager.addTask([&]() { ++fibersRun; });
999  }
1000  loopController.loop([&]() { loopController.stop(); });
1001 
1002  EXPECT_EQ(5, fibersRun);
1003  EXPECT_EQ(5, manager.fibersAllocated());
1004  EXPECT_EQ(5, manager.fibersPoolSize());
1005 
1006  for (size_t i = 0; i < 5; ++i) {
1007  manager.addTask([&]() { ++fibersRun; });
1008  }
1009  loopController.loop([&]() { loopController.stop(); });
1010 
1011  EXPECT_EQ(10, fibersRun);
1012  EXPECT_EQ(5, manager.fibersAllocated());
1013  EXPECT_EQ(5, manager.fibersPoolSize());
1014 }
1015 
1016 TEST(FiberManager, fibersPoolOverLimit) {
1017  FiberManager::Options opts;
1018  opts.maxFibersPoolSize = 5;
1019 
1020  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1021  auto& loopController =
1022  dynamic_cast<SimpleLoopController&>(manager.loopController());
1023 
1024  size_t fibersRun = 0;
1025 
1026  for (size_t i = 0; i < 10; ++i) {
1027  manager.addTask([&]() { ++fibersRun; });
1028  }
1029 
1030  EXPECT_EQ(0, fibersRun);
1031  EXPECT_EQ(10, manager.fibersAllocated());
1032  EXPECT_EQ(0, manager.fibersPoolSize());
1033 
1034  loopController.loop([&]() { loopController.stop(); });
1035 
1036  EXPECT_EQ(10, fibersRun);
1037  EXPECT_EQ(5, manager.fibersAllocated());
1038  EXPECT_EQ(5, manager.fibersPoolSize());
1039 }
1040 
1041 TEST(FiberManager, remoteFiberBasic) {
1042  FiberManager manager(std::make_unique<SimpleLoopController>());
1043  auto& loopController =
1044  dynamic_cast<SimpleLoopController&>(manager.loopController());
1045 
1046  int result[2];
1047  result[0] = result[1] = 0;
1048  folly::Optional<Promise<int>> savedPromise[2];
1049  manager.addTask([&]() {
1050  result[0] = await(
1051  [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1052  });
1053  manager.addTask([&]() {
1054  result[1] = await(
1055  [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1056  });
1057 
1058  manager.loopUntilNoReady();
1059 
1060  EXPECT_TRUE(savedPromise[0].hasValue());
1061  EXPECT_TRUE(savedPromise[1].hasValue());
1062  EXPECT_EQ(0, result[0]);
1063  EXPECT_EQ(0, result[1]);
1064 
1065  std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1066  std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1067  remoteThread0.join();
1068  remoteThread1.join();
1069  EXPECT_EQ(0, result[0]);
1070  EXPECT_EQ(0, result[1]);
1071  /* Should only have scheduled once */
1072  EXPECT_EQ(1, loopController.remoteScheduleCalled());
1073 
1074  manager.loopUntilNoReady();
1075  EXPECT_EQ(42, result[0]);
1076  EXPECT_EQ(43, result[1]);
1077 }
1078 
1079 TEST(FiberManager, addTaskRemoteBasic) {
1080  FiberManager manager(std::make_unique<SimpleLoopController>());
1081 
1082  int result[2];
1083  result[0] = result[1] = 0;
1084  folly::Optional<Promise<int>> savedPromise[2];
1085 
1086  std::thread remoteThread0{[&]() {
1087  manager.addTaskRemote([&]() {
1088  result[0] = await(
1089  [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1090  });
1091  }};
1092  std::thread remoteThread1{[&]() {
1093  manager.addTaskRemote([&]() {
1094  result[1] = await(
1095  [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1096  });
1097  }};
1098  remoteThread0.join();
1099  remoteThread1.join();
1100 
1101  manager.loopUntilNoReady();
1102 
1103  EXPECT_TRUE(savedPromise[0].hasValue());
1104  EXPECT_TRUE(savedPromise[1].hasValue());
1105  EXPECT_EQ(0, result[0]);
1106  EXPECT_EQ(0, result[1]);
1107 
1108  savedPromise[0]->setValue(42);
1109  savedPromise[1]->setValue(43);
1110 
1111  EXPECT_EQ(0, result[0]);
1112  EXPECT_EQ(0, result[1]);
1113 
1114  manager.loopUntilNoReady();
1115  EXPECT_EQ(42, result[0]);
1116  EXPECT_EQ(43, result[1]);
1117 }
1118 
1119 TEST(FiberManager, remoteHasTasks) {
1120  size_t counter = 0;
1121  FiberManager fm(std::make_unique<SimpleLoopController>());
1122  std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1123 
1124  remote.join();
1125 
1126  while (fm.hasTasks()) {
1127  fm.loopUntilNoReady();
1128  }
1129 
1130  EXPECT_FALSE(fm.hasTasks());
1131  EXPECT_EQ(counter, 1);
1132 }
1133 
1134 TEST(FiberManager, remoteHasReadyTasks) {
1135  int result = 0;
1136  folly::Optional<Promise<int>> savedPromise;
1137  FiberManager fm(std::make_unique<SimpleLoopController>());
1138  std::thread remote([&]() {
1139  fm.addTaskRemote([&]() {
1140  result = await(
1141  [&](Promise<int> promise) { savedPromise = std::move(promise); });
1142  EXPECT_TRUE(fm.hasTasks());
1143  });
1144  });
1145 
1146  remote.join();
1147  EXPECT_TRUE(fm.hasTasks());
1148 
1149  fm.loopUntilNoReady();
1150  EXPECT_TRUE(fm.hasTasks());
1151 
1152  std::thread remote2([&]() { savedPromise->setValue(47); });
1153  remote2.join();
1154  EXPECT_TRUE(fm.hasTasks());
1155 
1156  fm.loopUntilNoReady();
1157  EXPECT_FALSE(fm.hasTasks());
1158 
1159  EXPECT_EQ(result, 47);
1160 }
1161 
1162 template <typename Data>
1164  FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1165 
1166  fm.addTask([]() {
1167  EXPECT_EQ(42, local<Data>().value);
1168 
1169  local<Data>().value = 43;
1170 
1171  addTask([]() {
1172  EXPECT_EQ(43, local<Data>().value);
1173 
1174  local<Data>().value = 44;
1175 
1176  addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1177  });
1178  });
1179 
1180  fm.addTask([&]() {
1181  EXPECT_EQ(42, local<Data>().value);
1182 
1183  local<Data>().value = 43;
1184 
1185  fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1186  });
1187 
1188  fm.addTask([]() {
1189  EXPECT_EQ(42, local<Data>().value);
1190  local<Data>().value = 43;
1191 
1192  auto task = []() {
1193  EXPECT_EQ(43, local<Data>().value);
1194  local<Data>().value = 44;
1195  };
1196  std::vector<std::function<void()>> tasks{task};
1197  collectAny(tasks.begin(), tasks.end());
1198 
1199  EXPECT_EQ(43, local<Data>().value);
1200  });
1201 
1202  fm.loopUntilNoReady();
1203  EXPECT_FALSE(fm.hasTasks());
1204 }
1205 
1206 TEST(FiberManager, fiberLocal) {
1207  struct SimpleData {
1208  int value{42};
1209  };
1210 
1211  testFiberLocal<SimpleData>();
1212 }
1213 
1214 TEST(FiberManager, fiberLocalHeap) {
1215  struct LargeData {
1216  char _[1024 * 1024];
1217  int value{42};
1218  };
1219 
1220  testFiberLocal<LargeData>();
1221 }
1222 
1223 TEST(FiberManager, fiberLocalDestructor) {
1224  struct CrazyData {
1225  size_t data{42};
1226 
1227  ~CrazyData() {
1228  if (data == 41) {
1229  addTask([]() {
1230  EXPECT_EQ(42, local<CrazyData>().data);
1231  // Make sure we don't have infinite loop
1232  local<CrazyData>().data = 0;
1233  });
1234  }
1235  }
1236  };
1237 
1238  FiberManager fm(
1239  LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1240 
1241  fm.addTask([]() { local<CrazyData>().data = 41; });
1242 
1243  fm.loopUntilNoReady();
1244  EXPECT_FALSE(fm.hasTasks());
1245 }
1246 
1247 TEST(FiberManager, yieldTest) {
1248  FiberManager manager(std::make_unique<SimpleLoopController>());
1249  auto& loopController =
1250  dynamic_cast<SimpleLoopController&>(manager.loopController());
1251 
1252  bool checkRan = false;
1253 
1254  manager.addTask([&]() {
1255  manager.yield();
1256  checkRan = true;
1257  });
1258 
1259  loopController.loop([&]() {
1260  if (checkRan) {
1261  loopController.stop();
1262  }
1263  });
1264 
1265  EXPECT_TRUE(checkRan);
1266 }
1267 
1268 TEST(FiberManager, RequestContext) {
1269  FiberManager fm(std::make_unique<SimpleLoopController>());
1270 
1271  bool checkRun1 = false;
1272  bool checkRun2 = false;
1273  bool checkRun3 = false;
1274  bool checkRun4 = false;
1275  folly::fibers::Baton baton1;
1276  folly::fibers::Baton baton2;
1277  folly::fibers::Baton baton3;
1278  folly::fibers::Baton baton4;
1279 
1280  {
1282  auto rcontext1 = folly::RequestContext::get();
1283  fm.addTask([&, rcontext1]() {
1284  EXPECT_EQ(rcontext1, folly::RequestContext::get());
1285  baton1.wait(
1286  [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1287  EXPECT_EQ(rcontext1, folly::RequestContext::get());
1289  [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1290  checkRun1 = true;
1291  });
1292  }
1293  {
1295  auto rcontext2 = folly::RequestContext::get();
1296  fm.addTaskRemote([&, rcontext2]() {
1297  EXPECT_EQ(rcontext2, folly::RequestContext::get());
1298  baton2.wait();
1299  EXPECT_EQ(rcontext2, folly::RequestContext::get());
1300  checkRun2 = true;
1301  });
1302  }
1303  {
1305  auto rcontext3 = folly::RequestContext::get();
1306  fm.addTaskFinally(
1307  [&, rcontext3]() {
1308  EXPECT_EQ(rcontext3, folly::RequestContext::get());
1309  baton3.wait();
1310  EXPECT_EQ(rcontext3, folly::RequestContext::get());
1311 
1312  return folly::Unit();
1313  },
1314  [&, rcontext3](Try<folly::Unit>&& /* t */) {
1315  EXPECT_EQ(rcontext3, folly::RequestContext::get());
1316  checkRun3 = true;
1317  });
1318  }
1319  {
1321  fm.addTask([&]() {
1323  auto rcontext4 = folly::RequestContext::get();
1324  baton4.wait();
1325  EXPECT_EQ(rcontext4, folly::RequestContext::get());
1326  checkRun4 = true;
1327  });
1328  }
1329  {
1331  auto rcontext = folly::RequestContext::get();
1332 
1333  fm.loopUntilNoReady();
1335 
1336  baton1.post();
1338  fm.loopUntilNoReady();
1339  EXPECT_TRUE(checkRun1);
1341 
1342  baton2.post();
1344  fm.loopUntilNoReady();
1345  EXPECT_TRUE(checkRun2);
1347 
1348  baton3.post();
1350  fm.loopUntilNoReady();
1351  EXPECT_TRUE(checkRun3);
1353 
1354  baton4.post();
1356  fm.loopUntilNoReady();
1357  EXPECT_TRUE(checkRun4);
1359  }
1360 }
1361 
1362 TEST(FiberManager, resizePeriodically) {
1363  FiberManager::Options opts;
1364  opts.fibersPoolResizePeriodMs = 300;
1365  opts.maxFibersPoolSize = 5;
1366 
1367  FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1368 
1369  folly::EventBase evb;
1370  dynamic_cast<EventBaseLoopController&>(manager.loopController())
1371  .attachEventBase(evb);
1372 
1373  std::vector<Baton> batons(10);
1374 
1375  size_t tasksRun = 0;
1376  for (size_t i = 0; i < 30; ++i) {
1377  manager.addTask([i, &batons, &tasksRun]() {
1378  ++tasksRun;
1379  // Keep some fibers active indefinitely
1380  if (i < batons.size()) {
1381  batons[i].wait();
1382  }
1383  });
1384  }
1385 
1386  EXPECT_EQ(0, tasksRun);
1387  EXPECT_EQ(30, manager.fibersAllocated());
1388  EXPECT_EQ(0, manager.fibersPoolSize());
1389 
1390  evb.loopOnce();
1391  EXPECT_EQ(30, tasksRun);
1392  EXPECT_EQ(30, manager.fibersAllocated());
1393  // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1394  EXPECT_EQ(20, manager.fibersPoolSize());
1395 
1396  std::this_thread::sleep_for(std::chrono::milliseconds(400));
1397  evb.loopOnce(); // no fibers active in this period
1398  EXPECT_EQ(30, manager.fibersAllocated());
1399  EXPECT_EQ(20, manager.fibersPoolSize());
1400 
1401  std::this_thread::sleep_for(std::chrono::milliseconds(400));
1402  evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1403  EXPECT_EQ(15, manager.fibersAllocated());
1404  EXPECT_EQ(5, manager.fibersPoolSize());
1405 
1406  for (size_t i = 0; i < batons.size(); ++i) {
1407  batons[i].post();
1408  }
1409  evb.loopOnce();
1410  EXPECT_EQ(15, manager.fibersAllocated());
1411  EXPECT_EQ(15, manager.fibersPoolSize());
1412 
1413  std::this_thread::sleep_for(std::chrono::milliseconds(400));
1414  evb.loopOnce(); // 10 fibers active in last period
1415  EXPECT_EQ(10, manager.fibersAllocated());
1416  EXPECT_EQ(10, manager.fibersPoolSize());
1417 
1418  std::this_thread::sleep_for(std::chrono::milliseconds(400));
1419  evb.loopOnce();
1420  EXPECT_EQ(5, manager.fibersAllocated());
1421  EXPECT_EQ(5, manager.fibersPoolSize());
1422 }
1423 
1424 TEST(FiberManager, batonWaitTimeoutHandler) {
1425  FiberManager manager(std::make_unique<EventBaseLoopController>());
1426 
1427  folly::EventBase evb;
1428  dynamic_cast<EventBaseLoopController&>(manager.loopController())
1429  .attachEventBase(evb);
1430 
1431  size_t fibersRun = 0;
1432  Baton baton;
1433  Baton::TimeoutHandler timeoutHandler;
1434 
1435  manager.addTask([&]() {
1436  baton.wait(timeoutHandler);
1437  ++fibersRun;
1438  });
1439  manager.loopUntilNoReady();
1440 
1441  EXPECT_FALSE(baton.try_wait());
1442  EXPECT_EQ(0, fibersRun);
1443 
1444  timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1445  std::this_thread::sleep_for(std::chrono::milliseconds(500));
1446 
1447  EXPECT_FALSE(baton.try_wait());
1448  EXPECT_EQ(0, fibersRun);
1449 
1450  evb.loopOnce();
1451  manager.loopUntilNoReady();
1452 
1453  EXPECT_EQ(1, fibersRun);
1454 }
1455 
1456 TEST(FiberManager, batonWaitTimeoutMany) {
1457  FiberManager manager(std::make_unique<EventBaseLoopController>());
1458 
1459  folly::EventBase evb;
1460  dynamic_cast<EventBaseLoopController&>(manager.loopController())
1461  .attachEventBase(evb);
1462 
1463  constexpr size_t kNumTimeoutTasks = 10000;
1464  size_t tasksCount = kNumTimeoutTasks;
1465 
1466  // We add many tasks to hit timeout queue deallocation logic.
1467  for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1468  manager.addTask([&]() {
1469  Baton baton;
1470  Baton::TimeoutHandler timeoutHandler;
1471 
1473  timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1474  });
1475 
1476  baton.wait(timeoutHandler);
1477  if (--tasksCount == 0) {
1478  evb.terminateLoopSoon();
1479  }
1480  });
1481  }
1482 
1483  evb.loopForever();
1484 }
1485 
1486 TEST(FiberManager, remoteFutureTest) {
1487  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1488  auto& loopController =
1489  dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1490 
1491  int testValue1 = 5;
1492  int testValue2 = 7;
1493  auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1494  auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1495  loopController.loop([&]() { loopController.stop(); });
1496  auto v1 = std::move(f1).get();
1497  auto v2 = std::move(f2).get();
1498 
1499  EXPECT_EQ(v1, testValue1);
1500  EXPECT_EQ(v2, testValue2);
1501 }
1502 
1503 // Test that a void function produes a Future<Unit>.
1504 TEST(FiberManager, remoteFutureVoidUnitTest) {
1505  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1506  auto& loopController =
1507  dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1508 
1509  bool ranLocal = false;
1510  folly::Future<folly::Unit> futureLocal =
1511  fiberManager.addTaskFuture([&]() { ranLocal = true; });
1512 
1513  bool ranRemote = false;
1514  folly::Future<folly::Unit> futureRemote =
1515  fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1516 
1517  loopController.loop([&]() { loopController.stop(); });
1518 
1519  futureLocal.wait();
1520  ASSERT_TRUE(ranLocal);
1521 
1522  futureRemote.wait();
1523  ASSERT_TRUE(ranRemote);
1524 }
1525 
1526 TEST(FiberManager, nestedFiberManagers) {
1527  folly::EventBase outerEvb;
1528  folly::EventBase innerEvb;
1529 
1530  getFiberManager(outerEvb).addTask([&]() {
1531  EXPECT_EQ(
1533 
1534  runInMainContext([&]() {
1535  getFiberManager(innerEvb).addTask([&]() {
1536  EXPECT_EQ(
1538 
1539  innerEvb.terminateLoopSoon();
1540  });
1541 
1542  innerEvb.loopForever();
1543  });
1544 
1545  EXPECT_EQ(
1547 
1548  outerEvb.terminateLoopSoon();
1549  });
1550 
1551  outerEvb.loopForever();
1552 }
1553 
1554 TEST(FiberManager, semaphore) {
1555  static constexpr size_t kTasks = 10;
1556  static constexpr size_t kIterations = 10000;
1557  static constexpr size_t kNumTokens = 10;
1558 
1559  Semaphore sem(kNumTokens);
1560  int counterA = 0;
1561  int counterB = 0;
1562 
1563  auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1564  FiberManager manager(std::make_unique<EventBaseLoopController>());
1565  folly::EventBase evb;
1566  dynamic_cast<EventBaseLoopController&>(manager.loopController())
1567  .attachEventBase(evb);
1568 
1569  {
1570  std::shared_ptr<folly::EventBase> completionCounter(
1571  &evb, [](folly::EventBase* evb_) { evb_->terminateLoopSoon(); });
1572 
1573  for (size_t i = 0; i < kTasks; ++i) {
1574  manager.addTask([&, completionCounter]() {
1575  for (size_t j = 0; j < kIterations; ++j) {
1576  sem.wait();
1577  ++counter;
1578  sem.signal();
1579  --counter;
1580 
1581  EXPECT_LT(counter, kNumTokens);
1582  EXPECT_GE(counter, 0);
1583  }
1584  });
1585  }
1586 
1587  baton.wait();
1588  }
1589  evb.loopForever();
1590  };
1591 
1592  folly::fibers::Baton batonA;
1593  folly::fibers::Baton batonB;
1594  std::thread threadA([&] { task(counterA, batonA); });
1595  std::thread threadB([&] { task(counterB, batonB); });
1596 
1597  batonA.post();
1598  batonB.post();
1599  threadA.join();
1600  threadB.join();
1601 
1602  EXPECT_LT(counterA, kNumTokens);
1603  EXPECT_LT(counterB, kNumTokens);
1604  EXPECT_GE(counterA, 0);
1605  EXPECT_GE(counterB, 0);
1606 }
1607 
1608 template <typename ExecutorT>
1609 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1610  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1611  executor, [=](std::vector<int>&& batch) {
1612  EXPECT_EQ(batchSize, batch.size());
1613  std::vector<std::string> results;
1614  for (auto& it : batch) {
1615  results.push_back(folly::to<std::string>(it));
1616  }
1617  return results;
1618  });
1619 
1620  auto indexCopy = index;
1621  auto result = batchDispatcher.add(std::move(indexCopy));
1622  EXPECT_EQ(folly::to<std::string>(index), std::move(result).get());
1623 }
1624 
1625 TEST(FiberManager, batchDispatchTest) {
1626  folly::EventBase evb;
1627  auto& executor = getFiberManager(evb);
1628 
1629  // Launch multiple fibers with a single id.
1630  executor.add([&]() {
1631  int batchSize = 10;
1632  for (int i = 0; i < batchSize; i++) {
1633  executor.add(
1634  [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1635  }
1636  });
1637  evb.loop();
1638 
1639  // Reuse the same BatchDispatcher to batch once again.
1640  executor.add([&]() {
1641  int batchSize = 10;
1642  for (int i = 0; i < batchSize; i++) {
1643  executor.add(
1644  [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1645  }
1646  });
1647  evb.loop();
1648 }
1649 
1650 template <typename ExecutorT>
1652  ExecutorT& executor,
1653  int totalNumberOfElements,
1654  std::vector<int> input) {
1655  thread_local BatchDispatcher<
1656  std::vector<int>,
1657  std::vector<std::string>,
1658  ExecutorT>
1659  batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1660  std::vector<std::vector<std::string>> results;
1661  int numberOfElements = 0;
1662  for (auto& unit : batch) {
1663  numberOfElements += unit.size();
1664  std::vector<std::string> result;
1665  for (auto& element : unit) {
1666  result.push_back(folly::to<std::string>(element));
1667  }
1668  results.push_back(std::move(result));
1669  }
1670  EXPECT_EQ(totalNumberOfElements, numberOfElements);
1671  return results;
1672  });
1673 
1674  return batchDispatcher.add(std::move(input));
1675 }
1676 
1680 template <typename ExecutorT>
1682  ExecutorT& executor,
1683  int totalNumberOfElements,
1684  int index) {
1685  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1686  executor, [=, &executor](std::vector<int>&& batch) {
1687  EXPECT_EQ(totalNumberOfElements, batch.size());
1688  std::vector<std::string> results;
1689  std::vector<folly::Future<std::vector<std::string>>>
1690  innerDispatchResultFutures;
1691 
1692  std::vector<int> group;
1693  for (auto unit : batch) {
1694  group.push_back(unit);
1695  if (group.size() == 5) {
1696  auto localGroup = group;
1697  group.clear();
1698 
1699  innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1700  executor, totalNumberOfElements, localGroup));
1701  }
1702  }
1703 
1705  innerDispatchResultFutures.begin(),
1706  innerDispatchResultFutures.end())
1707  .toUnsafeFuture()
1708  .then([&](std::vector<Try<std::vector<std::string>>>
1709  innerDispatchResults) {
1710  for (auto& unit : innerDispatchResults) {
1711  for (auto& element : unit.value()) {
1712  results.push_back(element);
1713  }
1714  }
1715  })
1716  .get();
1717  return results;
1718  });
1719 
1720  auto indexCopy = index;
1721  auto result = batchDispatcher.add(std::move(indexCopy));
1722  EXPECT_EQ(folly::to<std::string>(index), std::move(result).get());
1723 }
1724 
1725 TEST(FiberManager, doubleBatchDispatchTest) {
1726  folly::EventBase evb;
1727  auto& executor = getFiberManager(evb);
1728 
1729  // Launch multiple fibers with a single id.
1730  executor.add([&]() {
1731  int totalNumberOfElements = 20;
1732  for (int i = 0; i < totalNumberOfElements; i++) {
1733  executor.add([=, &executor]() {
1734  doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1735  });
1736  }
1737  });
1738  evb.loop();
1739 }
1740 
1741 template <typename ExecutorT>
1743  thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1744  executor, [](std::vector<int> &&) -> std::vector<int> {
1745  throw std::runtime_error("Surprise!!");
1746  });
1747 
1748  EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1749 }
1750 
1751 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1752  folly::EventBase evb;
1753  auto& executor = getFiberManager(evb);
1754 
1755  // Launch multiple fibers with a single id.
1756  executor.add([&]() {
1757  int totalNumberOfElements = 5;
1758  for (int i = 0; i < totalNumberOfElements; i++) {
1759  executor.add(
1761  }
1762  });
1763  evb.loop();
1764 }
1765 
1767 
1768 using ValueT = size_t;
1770 using DispatchFunctionT =
1771  folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1772 
1773 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1774 #if ENABLE_TRACE_IN_TEST
1775 #define OUTPUT_TRACE std::cerr
1776 #else // ENABLE_TRACE_IN_TEST
1778  template <typename T>
1780  return *this;
1781  }
1782 
1783  DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1784  return *this;
1785  }
1786 } devNullPiper;
1787 #define OUTPUT_TRACE devNullPiper
1788 #endif // ENABLE_TRACE_IN_TEST
1789 
1790 struct Job {
1793 
1794  void preprocess(FiberManager& executor, bool die) {
1795  // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1796  clock_t msecToDoIO = folly::Random::rand32() % 10;
1797  double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1798  double endAfter = start + msecToDoIO;
1799  while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1800  executor.yield();
1801  }
1802  if (die) {
1803  throw std::logic_error("Simulating preprocessing failure");
1804  }
1805  }
1806 
1808  : token(std::move(t)), input(i) {}
1809 
1810  Job(Job&&) = default;
1811  Job& operator=(Job&&) = default;
1812 };
1813 
1815  return folly::to<ResultT>(std::move(input));
1816 }
1817 
1818 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1819  size_t expectedCount = inputs.size();
1820  std::vector<ResultT> results;
1821  results.reserve(expectedCount);
1822  for (size_t i = 0; i < expectedCount; ++i) {
1823  results.emplace_back(processSingleInput(std::move(inputs[i])));
1824  }
1825  return results;
1826 }
1827 
1829  AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1830  std::vector<Job>& jobs,
1831  size_t count) {
1832  jobs.clear();
1833  for (size_t i = 0; i < count; ++i) {
1834  jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1835  }
1836 }
1837 
1838 enum class DispatchProblem {
1839  None,
1842 };
1843 
1846  std::vector<Job>& jobs,
1847  std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1848  DispatchProblem dispatchProblem = DispatchProblem::None,
1849  size_t problemIndex = size_t(-1)) {
1850  EXPECT_TRUE(
1851  dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1852  results.clear();
1853  results.resize(jobs.size());
1854  for (size_t i = 0; i < jobs.size(); ++i) {
1855  executor.add(
1856  [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1857  try {
1858  Job job(std::move(jobs[i]));
1859 
1860  if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1861  if (i == problemIndex) {
1862  EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1863  return;
1864  }
1865  }
1866 
1867  job.preprocess(executor, false);
1868  OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1869  results[i] = job.token.dispatch(job.input);
1870  OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1871 
1872  if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1873  if (i == problemIndex) {
1874  EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1875  }
1876  }
1877  } catch (...) {
1878  OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1879  }
1880  });
1881  }
1882 }
1883 
1885  std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1886  size_t i) {
1887  try {
1888  OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1889  << std::endl;
1890  } catch (std::exception& e) {
1891  OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1892  throw;
1893  }
1894 }
1895 
1896 template <typename TException>
1898  std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1899  size_t expectedNumResults) {
1900  size_t numResultsFilled = 0;
1901  for (size_t i = 0; i < results.size(); ++i) {
1902  if (!results[i]) {
1903  continue;
1904  }
1905  ++numResultsFilled;
1906  EXPECT_THROW(validateResult(results, i), TException);
1907  }
1908  EXPECT_EQ(numResultsFilled, expectedNumResults);
1909 }
1910 
1911 void validateResults(
1912  std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1913  size_t expectedNumResults) {
1914  size_t numResultsFilled = 0;
1915  for (size_t i = 0; i < results.size(); ++i) {
1916  if (!results[i]) {
1917  continue;
1918  }
1919  ++numResultsFilled;
1920  EXPECT_NO_THROW(validateResult(results, i));
1921  ValueT expectedInput = i;
1922  EXPECT_EQ(
1923  results[i]->value(), processSingleInput(std::move(expectedInput)));
1924  }
1925  EXPECT_EQ(numResultsFilled, expectedNumResults);
1926 }
1927 
1928 } // namespace AtomicBatchDispatcherTesting
1929 
1930 #define SET_UP_TEST_FUNC \
1931  using namespace AtomicBatchDispatcherTesting; \
1932  folly::EventBase evb; \
1933  auto& executor = getFiberManager(evb); \
1934  const size_t COUNT = 11; \
1935  std::vector<Job> jobs; \
1936  jobs.reserve(COUNT); \
1937  std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1938  results.reserve(COUNT); \
1939  DispatchFunctionT dispatchFunc
1940 
1941 TEST(FiberManager, ABD_Test) {
1943 
1944  //
1945  // Testing AtomicBatchDispatcher with explicit call to commit()
1946  //
1947  dispatchFunc = userDispatchFunc;
1948  auto atomicBatchDispatcher =
1949  createAtomicBatchDispatcher(std::move(dispatchFunc));
1950  createJobs(atomicBatchDispatcher, jobs, COUNT);
1951  dispatchJobs(executor, jobs, results);
1952  atomicBatchDispatcher.commit();
1953  evb.loop();
1954  validateResults(results, COUNT);
1955 }
1956 
1957 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1959 
1960  //
1961  // Testing AtomicBatchDispatcher destroyed before calling commit.
1962  // Handles error cases for:
1963  // - User might have forgotten to add the call to commit() in the code
1964  // - An unexpected exception got thrown in user code before commit() is called
1965  //
1966  try {
1967  dispatchFunc = userDispatchFunc;
1968  auto atomicBatchDispatcher =
1969  createAtomicBatchDispatcher(std::move(dispatchFunc));
1970  createJobs(atomicBatchDispatcher, jobs, COUNT);
1971  dispatchJobs(executor, jobs, results);
1972  throw std::runtime_error(
1973  "Unexpected exception in user code before commit called");
1974  // atomicBatchDispatcher.commit();
1975  } catch (...) {
1976  /* User code handles the exception and does not exit process */
1977  }
1978  evb.loop();
1979  validateResults<ABDCommitNotCalledException>(results, COUNT);
1980 }
1981 
1982 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1984 
1985  //
1986  // Testing preprocessing failure on a job throws
1987  //
1988  dispatchFunc = userDispatchFunc;
1989  auto atomicBatchDispatcher =
1990  createAtomicBatchDispatcher(std::move(dispatchFunc));
1991  createJobs(atomicBatchDispatcher, jobs, COUNT);
1992  dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1993  atomicBatchDispatcher.commit();
1994  evb.loop();
1995  validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1996 }
1997 
1998 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
2000 
2001  //
2002  // Testing that calling dispatch more than once on the same token throws
2003  //
2004  dispatchFunc = userDispatchFunc;
2005  auto atomicBatchDispatcher =
2006  createAtomicBatchDispatcher(std::move(dispatchFunc));
2007  createJobs(atomicBatchDispatcher, jobs, COUNT);
2008  dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2009  atomicBatchDispatcher.commit();
2010  evb.loop();
2011 }
2012 
2013 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2015 
2016  //
2017  // Testing that exception set on attempt to call getToken after commit called
2018  //
2019  dispatchFunc = userDispatchFunc;
2020  auto atomicBatchDispatcher =
2021  createAtomicBatchDispatcher(std::move(dispatchFunc));
2022  createJobs(atomicBatchDispatcher, jobs, COUNT);
2023  atomicBatchDispatcher.commit();
2024  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2025  dispatchJobs(executor, jobs, results);
2026  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2027  evb.loop();
2028  validateResults(results, COUNT);
2029  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2030 }
2031 
2032 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2034 
2035  //
2036  // Testing that exception is set if user provided batch dispatch throws
2037  //
2038  dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2039  (void)userDispatchFunc(std::move(inputs));
2040  throw std::runtime_error("Unexpected exception in user dispatch function");
2041  };
2042  auto atomicBatchDispatcher =
2043  createAtomicBatchDispatcher(std::move(dispatchFunc));
2044  createJobs(atomicBatchDispatcher, jobs, COUNT);
2045  dispatchJobs(executor, jobs, results);
2046  atomicBatchDispatcher.commit();
2047  evb.loop();
2048  validateResults<std::runtime_error>(results, COUNT);
2049 }
2050 
2051 TEST(FiberManager, VirtualEventBase) {
2052  bool done1{false};
2053  bool done2{false};
2054  {
2056 
2057  auto evb1 =
2058  std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2059  auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2060 
2061  getFiberManager(*evb1).addTaskRemote([&] {
2062  Baton baton;
2063  baton.try_wait_for(std::chrono::milliseconds{100});
2064 
2065  done1 = true;
2066  });
2067 
2068  getFiberManager(evb2).addTaskRemote([&] {
2069  Baton baton;
2070  baton.try_wait_for(std::chrono::milliseconds{200});
2071 
2072  done2 = true;
2073  });
2074 
2075  EXPECT_FALSE(done1);
2076  EXPECT_FALSE(done2);
2077 
2078  evb1.reset();
2079  EXPECT_TRUE(done1);
2080  EXPECT_FALSE(done2);
2081  }
2082  EXPECT_TRUE(done2);
2083 }
2084 
2085 TEST(TimedMutex, ThreadsAndFibersDontDeadlock) {
2086  folly::EventBase evb;
2087  auto& fm = getFiberManager(evb);
2088  TimedMutex mutex;
2089  std::thread testThread([&] {
2090  for (int i = 0; i < 100; i++) {
2091  mutex.lock();
2092  mutex.unlock();
2093  {
2094  Baton b;
2095  b.try_wait_for(std::chrono::milliseconds(1));
2096  }
2097  }
2098  });
2099 
2100  for (int numFibers = 0; numFibers < 100; numFibers++) {
2101  fm.addTask([&] {
2102  for (int i = 0; i < 20; i++) {
2103  mutex.lock();
2104  {
2105  Baton b;
2106  b.try_wait_for(std::chrono::milliseconds(1));
2107  }
2108  mutex.unlock();
2109  {
2110  Baton b;
2111  b.try_wait_for(std::chrono::milliseconds(1));
2112  }
2113  }
2114  });
2115  }
2116 
2117  evb.loop();
2118  EXPECT_EQ(0, fm.hasTasks());
2119  testThread.join();
2120 }
2121 
2122 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2123  folly::EventBase evb;
2124  auto& fm = getFiberManager(evb);
2125  TimedMutex mutex;
2126 
2127  mutex.lock();
2128  std::thread unlockThread([&] {
2129  /* sleep override */ std::this_thread::sleep_for(
2130  std::chrono::milliseconds{100});
2131  mutex.unlock();
2132  });
2133 
2134  fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2135  fm.addTask([&] {
2136  runInMainContext([&] {
2137  auto locked = mutex.timed_lock(std::chrono::seconds{1});
2138  EXPECT_TRUE(locked);
2139  if (locked) {
2140  mutex.unlock();
2141  }
2142  });
2143  });
2144 
2145  evb.loopOnce();
2146  EXPECT_EQ(0, fm.hasTasks());
2147 
2148  unlockThread.join();
2149 }
2150 
2151 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2152  folly::EventBase evb;
2153  auto& fm = getFiberManager(evb);
2154  TimedMutex mutex;
2155 
2156  mutex.lock();
2157 
2158  fm.addTask([&] {
2159  auto locked = mutex.timed_lock(std::chrono::seconds{1});
2160  EXPECT_TRUE(locked);
2161  if (locked) {
2162  mutex.unlock();
2163  }
2164  });
2165  fm.addTask([&] {
2166  mutex.unlock();
2167  runInMainContext([&] {
2168  auto locked = mutex.timed_lock(std::chrono::seconds{1});
2169  EXPECT_TRUE(locked);
2170  if (locked) {
2171  mutex.unlock();
2172  }
2173  });
2174  });
2175 
2176  evb.loopOnce();
2177  EXPECT_EQ(0, fm.hasTasks());
2178 }
2179 
2180 namespace {
2181 // Checks whether stackHighWatermark is set for non-ASAN builds,
2182 // and not set for ASAN builds.
2183 #ifndef FOLLY_SANITIZE_ADDRESS
2184 void expectStackHighWatermark(size_t minStackSize, size_t stackHighWatermark) {
2185  // Check that we properly accounted fiber stack usage
2186  EXPECT_NE(0, stackHighWatermark);
2187  EXPECT_LT(minStackSize, stackHighWatermark);
2188 }
2189 #else
2190 void expectStackHighWatermark(size_t, size_t stackHighWatermark) {
2191  // For ASAN, stackHighWatermark is not tracked.
2192  EXPECT_EQ(0, stackHighWatermark);
2193 }
2194 #endif
2195 } // namespace
2196 
2202 TEST(FiberManager, highWaterMarkViaRecordStackEvery) {
2203  auto f = [] {
2205  opts.recordStackEvery = 1;
2206 
2207  FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2208  auto& loopController =
2209  dynamic_cast<SimpleLoopController&>(fm.loopController());
2210 
2211  static constexpr size_t n = 1000;
2212  int s = 0;
2213  fm.addTask([&]() {
2214  int b[n] = {0};
2215  for (size_t i = 0; i < n; ++i) {
2216  b[i] = i;
2217  }
2218  for (size_t i = 0; i + 1 < n; ++i) {
2219  s += b[i] * b[i + 1];
2220  }
2221  });
2222 
2223  (void)s;
2224 
2225  loopController.loop([&]() { loopController.stop(); });
2226  expectStackHighWatermark(n * sizeof(int), fm.stackHighWatermark());
2227  };
2228  std::thread(f).join();
2229 }
2230 
2235 TEST(FiberManager, highWaterMarkViaRecordCurrentPosition) {
2236  auto f = [] {
2237  FiberManager fm(std::make_unique<SimpleLoopController>());
2238  auto& loopController =
2239  dynamic_cast<SimpleLoopController&>(fm.loopController());
2240 
2241  static constexpr size_t n = 1000;
2242  int s = 0;
2243  fm.addTask([&]() {
2244  int b[n] = {0};
2245  for (size_t i = 0; i < n; ++i) {
2246  b[i] = i;
2247  }
2248  for (size_t i = 0; i + 1 < n; ++i) {
2249  s += b[i] * b[i + 1];
2250  }
2251  // Calls preempt, which calls recordStackPosition.
2252  fm.runInMainContext([]() {});
2253  });
2254 
2255  (void)s;
2256 
2257  loopController.loop([&]() { loopController.stop(); });
2258  expectStackHighWatermark(n * sizeof(int), fm.stackHighWatermark());
2259  };
2260  std::thread(f).join();
2261 }
std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, typename std::pair< size_t, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > >::type collectAny(InputIterator first, InputIterator last)
Definition: WhenN-inl.h:192
std::vector< typename std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > >::type > collectAll(InputIterator first, InputIterator last)
Definition: WhenN-inl.h:147
void validateResult(std::vector< folly::Optional< folly::Future< ResultT >>> &results, size_t i)
auto f
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
Definition: Request.cpp:227
std::unique_ptr< int > A
#define EXPECT_NO_THROW(statement)
Definition: gtest.h:1845
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
void doubleBatchOuterDispatch(ExecutorT &executor, int totalNumberOfElements, int index)
char b
size_t fibersAllocated() const
std::vector< ResultT > userDispatchFunc(std::vector< ValueT > &&inputs)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
#define SET_UP_TEST_FUNC
Future< ResultT > add(ValueT value)
AtomicBatchDispatcher< InputT, ResultT > createAtomicBatchDispatcher(folly::Function< std::vector< ResultT >(std::vector< InputT > &&)> dispatchFunc, size_t initialCapacity)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
std::list< AtForkTask > tasks
Definition: AtFork.cpp:81
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
std::vector< typename std::enable_if< !std::is_same< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type >, void >::value, typename std::pair< size_t, invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > >::type > collectN(InputIterator first, InputIterator last, size_t n)
Definition: WhenN-inl.h:34
folly::Future< std::vector< std::string > > doubleBatchInnerDispatch(ExecutorT &executor, int totalNumberOfElements, std::vector< int > input)
STL namespace.
invoke_result_t< F > runInMainContext(F &&func)
constexpr bool kIsSanitizeAddress
Definition: Portability.h:118
#define OUTPUT_TRACE
detail::Batch batch(size_t batchSize)
Definition: Base-inl.h:2602
void addTaskFinally(F &&func, G &&finally)
SemiFuture< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAllSemiFuture(Fs &&...fs)
Definition: Future-inl.h:1441
folly::std T
void add(folly::Func f) override
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
void addTaskFinally(F &&func, G &&finally)
DevNullPiper & operator<<(std::ostream &(*)(std::ostream &))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void preprocess(FiberManager &executor, bool die)
auto addTaskFuture(F &&func) -> folly::Future< typename folly::lift_unit< invoke_result_t< F >>::type >
Future< T > & wait()&
Definition: Future-inl.h:2197
Single-threaded task execution engine.
void scheduleTimeout(TimeoutController::Duration timeoutMs)
Definition: Baton.cpp:192
void testFiberLocal()
EventBase * evb_
void validateResults(std::vector< folly::Optional< folly::Future< ResultT >>> &results, size_t expectedNumResults)
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
void dispatchJobs(FiberManager &executor, std::vector< Job > &jobs, std::vector< folly::Optional< folly::Future< ResultT >>> &results, DispatchProblem dispatchProblem=DispatchProblem::None, size_t problemIndex=size_t(-1))
void terminateLoopSoon()
Definition: EventBase.cpp:493
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
TaskIterator< invoke_result_t< typename std::iterator_traits< InputIterator >::value_type > > addTasks(InputIterator first, InputIterator last)
Definition: AddTasks-inl.h:114
void forEach(InputIterator first, InputIterator last, F &&f)
Definition: ForEach-inl.h:41
constexpr Unit unit
Definition: Unit.h:45
LoopController & loopController()
auto start
Definition: Try.h:51
void addTask(F &&func)
int * count
Optional< NamedGroup > group
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
invoke_result_t< F > runInMainContext(F &&func)
AtomicBatchDispatcher< ValueT, ResultT >::Token token
std::mutex mutex
std::atomic< int > counter
const char * string
Definition: Conv.cpp:212
struct AtomicBatchDispatcherTesting::DevNullPiper devNullPiper
static FiberManager * getFiberManagerUnsafe()
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
static set< string > s
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
const internal::AnythingMatcher _
size_t stackHighWatermark() const
void createJobs(AtomicBatchDispatcher< ValueT, ResultT > &atomicBatchDispatcher, std::vector< Job > &jobs, size_t count)
Job(AtomicBatchDispatcher< ValueT, ResultT >::Token &&t, ValueT i)
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void batchDispatchExceptionHandling(ExecutorT &executor, int i)
bool timed_lock(const std::chrono::duration< Rep, Period > &duration)
static uint32_t rand32()
Definition: Random.h:213
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
char c
ResultT processSingleInput(ValueT &&input)
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout)
Definition: Baton.h:105
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static RequestContext * get()
Definition: Request.cpp:290
folly::VirtualEventBase & getVirtualEventBase()
Definition: EventBase.cpp:768
void singleBatchDispatch(ExecutorT &executor, int batchSize, int index)
size_t fibersPoolSize() const
constexpr detail::First first
Definition: Base-inl.h:2553
auto addTaskRemoteFuture(F &&func) -> folly::Future< typename folly::lift_unit< invoke_result_t< F >>::type >
FirstArgOf< F >::type::value_type await(F &&func)
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934
FiberManager & getFiberManager(EventBase &evb, const FiberManager::Options &opts)