proxygen
ViaTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thread>
18 
19 #include <folly/MPMCQueue.h>
23 #include <folly/futures/Future.h>
26 
27 using namespace folly;
28 
29 struct ManualWaiter : public DrivableExecutor {
30  explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex_) : ex(ex_) {}
31 
32  void add(Func f) override {
33  ex->add(std::move(f));
34  }
35 
36  void drive() override {
37  ex->wait();
38  ex->run();
39  }
40 
41  std::shared_ptr<ManualExecutor> ex;
42 };
43 
44 struct ViaFixture : public testing::Test {
46  : westExecutor(new ManualExecutor),
47  eastExecutor(new ManualExecutor),
48  waiter(new ManualWaiter(westExecutor)),
49  done(false) {
50  th = std::thread([=] {
51  ManualWaiter eastWaiter(eastExecutor);
52  while (!done) {
53  eastWaiter.drive();
54  }
55  });
56  }
57 
58  ~ViaFixture() override {
59  done = true;
60  eastExecutor->add([=]() {});
61  th.join();
62  }
63 
64  void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65  eastExecutor->add([=]() { cob(a + b); });
66  }
67 
68  std::shared_ptr<ManualExecutor> westExecutor;
69  std::shared_ptr<ManualExecutor> eastExecutor;
70  std::shared_ptr<ManualWaiter> waiter;
72  std::atomic<bool> done;
73  std::thread th;
74 };
75 
76 TEST(Via, exceptionOnLaunch) {
77  auto future = makeFuture<int>(std::runtime_error("E"));
78  EXPECT_THROW(future.value(), std::runtime_error);
79 }
80 
81 TEST(Via, thenValue) {
82  auto future = makeFuture(std::move(1)).then([](Try<int>&& t) {
83  return t.value() == 1;
84  });
85 
86  EXPECT_TRUE(future.value());
87 }
88 
89 TEST(Via, thenFuture) {
90  auto future = makeFuture(1).then(
91  [](Try<int>&& t) { return makeFuture(t.value() == 1); });
92  EXPECT_TRUE(future.value());
93 }
94 
96  return makeFuture(t.value() + ";static");
97 }
98 
99 TEST(Via, thenFunction) {
100  struct Worker {
102  return makeFuture(t.value() + ";class");
103  }
105  return makeFuture(t.value() + ";class-static");
106  }
107  } w;
108 
109  auto f = makeFuture(std::string("start"))
110  .then(doWorkStatic)
111  .then(Worker::doWorkStatic)
112  .then(&Worker::doWork, &w);
113 
114  EXPECT_EQ(f.value(), "start;static;class-static;class");
115 }
116 
117 TEST_F(ViaFixture, threadHops) {
118  auto westThreadId = std::this_thread::get_id();
119  auto f = via(eastExecutor.get())
120  .then([=](Try<Unit>&& /* t */) {
121  EXPECT_NE(std::this_thread::get_id(), westThreadId);
122  return makeFuture<int>(1);
123  })
124  .via(westExecutor.get())
125  .then([=](Try<int>&& t) {
126  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
127  return t.value();
128  });
129  EXPECT_EQ(f.getVia(waiter.get()), 1);
130 }
131 
132 TEST_F(ViaFixture, chainVias) {
133  auto westThreadId = std::this_thread::get_id();
134  auto f = via(eastExecutor.get())
135  .thenValue([=](auto&&) {
136  EXPECT_NE(std::this_thread::get_id(), westThreadId);
137  return 1;
138  })
139  .then([=](int val) {
140  return makeFuture(val)
141  .via(westExecutor.get())
142  .then([=](int v) mutable {
143  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
144  return v + 1;
145  });
146  })
147  .then([=](int val) {
148  // even though ultimately the future that triggers this one
149  // executed in the west thread, this then() inherited the
150  // executor from its predecessor, ie the eastExecutor.
151  EXPECT_NE(std::this_thread::get_id(), westThreadId);
152  return val + 1;
153  })
154  .via(westExecutor.get())
155  .then([=](int val) {
156  // go back to west, so we can wait on it
157  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
158  return val + 1;
159  });
160 
161  EXPECT_EQ(f.getVia(waiter.get()), 4);
162 }
163 
164 TEST_F(ViaFixture, bareViaAssignment) {
165  auto f = via(eastExecutor.get());
166 }
167 TEST_F(ViaFixture, viaAssignment) {
168  // via()&&
169  auto f = makeFuture().via(eastExecutor.get());
170  // via()&
171  auto f2 = f.via(eastExecutor.get());
172 }
173 
174 TEST(Via, chain1) {
175  EXPECT_EQ(42, makeFuture().thenMulti([] { return 42; }).get());
176 }
177 
178 TEST(Via, chain3) {
179  int count = 0;
180  auto f = makeFuture().thenMulti(
181  [&] {
182  count++;
183  return 3.14159;
184  },
185  [&](double) {
186  count++;
187  return std::string("hello");
188  },
189  [&] {
190  count++;
191  return makeFuture(42);
192  });
193  EXPECT_EQ(42, std::move(f).get());
194  EXPECT_EQ(3, count);
195 }
196 
197 struct PriorityExecutor : public Executor {
198  void add(Func /* f */) override {}
199 
200  void addWithPriority(Func f, int8_t priority) override {
201  int mid = getNumPriorities() / 2;
202  int p = priority < 0 ? std::max(0, mid + priority)
203  : std::min(getNumPriorities() - 1, mid + priority);
204  EXPECT_LT(p, 3);
205  EXPECT_GE(p, 0);
206  if (p == 0) {
207  count0++;
208  } else if (p == 1) {
209  count1++;
210  } else if (p == 2) {
211  count2++;
212  }
213  f();
214  }
215 
216  uint8_t getNumPriorities() const override {
217  return 3;
218  }
219 
220  int count0{0};
221  int count1{0};
222  int count2{0};
223 };
224 
225 TEST(Via, priority) {
227  via(&exe, -1).thenValue([](auto&&) {});
228  via(&exe, 0).thenValue([](auto&&) {});
229  via(&exe, 1).thenValue([](auto&&) {});
230  via(&exe, 42).thenValue([](auto&&) {}); // overflow should go to max priority
231  via(&exe, -42).thenValue(
232  [](auto&&) {}); // underflow should go to min priority
233  via(&exe).thenValue([](auto&&) {}); // default to mid priority
234  via(&exe, Executor::LO_PRI).thenValue([](auto&&) {});
235  via(&exe, Executor::HI_PRI).thenValue([](auto&&) {});
236  EXPECT_EQ(3, exe.count0);
237  EXPECT_EQ(2, exe.count1);
238  EXPECT_EQ(3, exe.count2);
239 }
240 
241 TEST_F(ViaFixture, chainX1) {
242  EXPECT_EQ(
243  42,
244  makeFuture()
245  .thenMultiWithExecutor(eastExecutor.get(), [] { return 42; })
246  .get());
247 }
248 
249 TEST_F(ViaFixture, chainX3) {
250  auto westThreadId = std::this_thread::get_id();
251  int count = 0;
252  auto f = via(westExecutor.get())
253  .thenMultiWithExecutor(
254  eastExecutor.get(),
255  [&] {
256  EXPECT_NE(std::this_thread::get_id(), westThreadId);
257  count++;
258  return 3.14159;
259  },
260  [&](double) {
261  count++;
262  return std::string("hello");
263  },
264  [&] { count++; })
265  .thenValue([&](auto&&) {
266  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
267  return makeFuture(42);
268  });
269  EXPECT_EQ(42, f.getVia(waiter.get()));
270  EXPECT_EQ(3, count);
271 }
272 
273 TEST(Via, then2) {
274  ManualExecutor x1, x2;
275  bool a = false, b = false, c = false;
276  via(&x1)
277  .thenValue([&](auto&&) { a = true; })
278  .then(&x2, [&](auto&&) { b = true; })
279  .thenValue([&](auto&&) { c = true; });
280 
281  EXPECT_FALSE(a);
282  EXPECT_FALSE(b);
283 
284  x1.run();
285  EXPECT_TRUE(a);
286  EXPECT_FALSE(b);
287  EXPECT_FALSE(c);
288 
289  x2.run();
290  EXPECT_TRUE(b);
291  EXPECT_FALSE(c);
292 
293  x1.run();
294  EXPECT_TRUE(c);
295 }
296 
297 TEST(Via, then2Variadic) {
298  struct Foo {
299  bool a = false;
300  void foo(Try<Unit>) {
301  a = true;
302  }
303  };
304  Foo f;
306  makeFuture().then(&x, &Foo::foo, &f);
307  EXPECT_FALSE(f.a);
308  x.run();
309  EXPECT_TRUE(f.a);
310 }
311 
312 #ifndef __APPLE__ // TODO #7372389
313 class ThreadExecutor : public Executor {
316  std::atomic<bool> done{false};
317  std::thread worker;
319 
320  void work() {
321  baton.post();
322  Func fn;
323  while (!done) {
324  while (!funcs.isEmpty()) {
325  funcs.blockingRead(fn);
326  fn();
327  }
328  }
329  }
330 
331  public:
332  explicit ThreadExecutor(size_t n = 1024) : funcs(n) {
333  worker = std::thread(std::bind(&ThreadExecutor::work, this));
334  }
335 
336  ~ThreadExecutor() override {
337  done = true;
338  funcs.write([] {});
339  worker.join();
340  }
341 
342  void add(Func fn) override {
343  funcs.blockingWrite(std::move(fn));
344  }
345 
346  void waitForStartup() {
347  baton.wait();
348  }
349 };
350 
351 TEST(Via, viaThenGetWasRacy) {
353  std::unique_ptr<int> val =
354  folly::via(&x)
355  .thenValue([](auto&&) { return std::make_unique<int>(42); })
356  .get();
357  ASSERT_TRUE(!!val);
358  EXPECT_EQ(42, *val);
359 }
360 
361 TEST(Via, callbackRace) {
363 
364  auto fn = [&x] {
365  auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
366  std::vector<Future<Unit>> futures;
367 
368  for (auto& p : *promises) {
369  futures.emplace_back(p.getFuture().via(&x).then([](Try<Unit>&&) {}));
370  }
371 
372  x.waitForStartup();
373  x.add([promises] {
374  for (auto& p : *promises) {
375  p.setValue();
376  }
377  });
378 
379  return collectAll(futures);
380  };
381 
382  fn().wait();
383 }
384 #endif
385 
387  public:
388  void add(Func /* f */) override {}
389  void drive() override {
390  ran = true;
391  }
392  bool ran{false};
393 };
394 
395 TEST(Via, getVia) {
396  {
397  // non-void
399  auto f = via(&x).thenValue([](auto&&) { return true; });
400  EXPECT_TRUE(f.getVia(&x));
401  }
402 
403  {
404  // void
406  auto f = via(&x).then();
407  f.getVia(&x);
408  }
409 
410  {
412  auto f = makeFuture(true);
413  EXPECT_TRUE(f.getVia(&x));
414  EXPECT_FALSE(x.ran);
415  }
416 }
417 
418 TEST(Via, SimpleTimedGetVia) {
421  auto f = p.getFuture();
422  EXPECT_THROW(f.getVia(&e2, std::chrono::seconds(1)), FutureTimeout);
423 }
424 
425 TEST(Via, getTryVia) {
426  {
427  // non-void
429  auto f = via(&x).thenValue([](auto&&) { return 23; });
430  EXPECT_FALSE(f.isReady());
431  EXPECT_EQ(23, f.getTryVia(&x).value());
432  }
433 
434  {
435  // void
437  auto f = via(&x).then();
438  EXPECT_FALSE(f.isReady());
439  auto t = f.getTryVia(&x);
440  EXPECT_TRUE(t.hasValue());
441  }
442 
443  {
445  auto f = makeFuture(23);
446  EXPECT_EQ(23, f.getTryVia(&x).value());
447  EXPECT_FALSE(x.ran);
448  }
449 }
450 
451 TEST(Via, SimpleTimedGetTryVia) {
454  auto f = p.getFuture();
455  EXPECT_THROW(f.getTryVia(&e2, std::chrono::seconds(1)), FutureTimeout);
456 }
457 
458 TEST(Via, waitVia) {
459  {
461  auto f = via(&x).then();
462  EXPECT_FALSE(f.isReady());
463  f.waitVia(&x);
464  EXPECT_TRUE(f.isReady());
465  }
466 
467  {
468  // try rvalue as well
470  auto f = via(&x).then().waitVia(&x);
471  EXPECT_TRUE(f.isReady());
472  }
473 
474  {
476  makeFuture(true).waitVia(&x);
477  EXPECT_FALSE(x.ran);
478  }
479 }
480 
481 TEST(Via, viaRaces) {
483  Promise<Unit> p;
484  auto tid = std::this_thread::get_id();
485  bool done = false;
486 
487  std::thread t1([&] {
488  p.getFuture()
489  .via(&x)
490  .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
491  .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
492  .then([&](Try<Unit>&&) { done = true; });
493  });
494 
495  std::thread t2([&] { p.setValue(); });
496 
497  while (!done) {
498  x.run();
499  }
500  t1.join();
501  t2.join();
502 }
503 
504 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
505  // The callback object will get destroyed when passed to the executor.
506 
507  // A promise will be captured by the callback lambda so we can observe that
508  // it will be destroyed.
509  Promise<Unit> captured_promise;
510  auto captured_promise_future = captured_promise.getFuture();
511 
513  auto future = makeFuture().via(&x).thenValue(
514  [c = std::move(captured_promise)](auto&&) { return 42; });
515 
516  EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
517  EXPECT_THROW(
518  std::move(captured_promise_future).get(std::chrono::seconds(5)),
519  BrokenPromise);
520 }
521 
522 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
523  // The callback object will get destroyed when passed to the executor.
524 
525  // A promise will be captured by the callback lambda so we can observe that
526  // it will be destroyed.
527  Promise<Unit> captured_promise;
528  auto captured_promise_future = captured_promise.getFuture();
529 
531  Promise<Unit> trigger;
532  auto future = trigger.getFuture().via(&x).thenValue(
533  [c = std::move(captured_promise)](auto&&) { return 42; });
534  trigger.setValue();
535 
536  EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
537  EXPECT_THROW(
538  std::move(captured_promise_future).get(std::chrono::seconds(5)),
539  BrokenPromise);
540 }
541 
542 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
543  // The callback object will get destroyed when the ManualExecutor runs out
544  // of scope.
545 
546  // A promise will be captured by the callback lambda so we can observe that
547  // it will be destroyed.
548  Promise<Unit> captured_promise;
549  auto captured_promise_future = captured_promise.getFuture();
550 
551  Optional<Future<int>> future;
552  {
554  future = makeFuture().via(&x).thenValue(
555  [c = std::move(captured_promise)](auto&&) { return 42; });
556  x.clear();
557  }
558 
559  EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
560  EXPECT_THROW(
561  std::move(captured_promise_future).get(std::chrono::seconds(5)),
562  BrokenPromise);
563 }
564 
565 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
566  // The callback object will get destroyed when the ManualExecutor runs out
567  // of scope.
568 
569  // A promise will be captured by the callback lambda so we can observe that
570  // it will be destroyed.
571  Promise<Unit> captured_promise;
572  auto captured_promise_future = captured_promise.getFuture();
573 
574  Optional<Future<int>> future;
575  {
577  Promise<Unit> trigger;
578  future = trigger.getFuture().via(&x).thenValue(
579  [c = std::move(captured_promise)](auto&&) { return 42; });
580  trigger.setValue();
581  x.clear();
582  }
583 
584  EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
585  EXPECT_THROW(
586  std::move(captured_promise_future).get(std::chrono::seconds(5)),
587  BrokenPromise);
588 }
589 
590 TEST(ViaFunc, liftsVoid) {
592  int count = 0;
593  Future<Unit> f = via(&x, [&] { count++; });
594 
595  EXPECT_EQ(0, count);
596  x.run();
597  EXPECT_EQ(1, count);
598 }
599 
600 TEST(ViaFunc, value) {
602  EXPECT_EQ(42, via(&x, [] { return 42; }).getVia(&x));
603 }
604 
605 TEST(ViaFunc, exception) {
607  EXPECT_THROW(
608  via(&x, []() -> int { throw std::runtime_error("expected"); }).getVia(&x),
609  std::runtime_error);
610 }
611 
612 TEST(ViaFunc, future) {
614  EXPECT_EQ(42, via(&x, [] { return makeFuture(42); }).getVia(&x));
615 }
616 
617 TEST(ViaFunc, semi_future) {
619  EXPECT_EQ(42, via(&x, [] { return makeSemiFuture(42); }).getVia(&x));
620 }
621 
622 TEST(ViaFunc, voidFuture) {
624  int count = 0;
625  via(&x, [&] { count++; }).getVia(&x);
626  EXPECT_EQ(1, count);
627 }
628 
629 TEST(ViaFunc, isSticky) {
631  int count = 0;
632 
633  auto f = via(&x, [&] { count++; });
634  x.run();
635 
636  std::move(f).thenValue([&](auto&&) { count++; });
637  EXPECT_EQ(1, count);
638  x.run();
639  EXPECT_EQ(2, count);
640 }
641 
642 TEST(ViaFunc, moveOnly) {
644  auto intp = std::make_unique<int>(42);
645 
646  EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
647 }
648 
649 TEST(ViaFunc, valueKeepAlive) {
651  EXPECT_EQ(42, via(getKeepAliveToken(&x), [] { return 42; }).getVia(&x));
652 }
653 
654 TEST(ViaFunc, thenValueKeepAlive) {
656  EXPECT_EQ(
657  42,
659  .thenValue([](auto&&) { return 42; })
660  .getVia(&x));
661 }
std::shared_ptr< ManualExecutor > eastExecutor
Definition: ViaTest.cpp:69
std::atomic< bool > done
Definition: ViaTest.cpp:72
auto f
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
char b
LogLevel max
Definition: LogLevel.cpp:31
InlineExecutor inlineExecutor
Definition: ViaTest.cpp:71
~ThreadExecutor() override
Definition: ViaTest.cpp:336
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static const int8_t LO_PRI
Definition: Executor.h:48
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
const int x
void add(Func) override
Definition: ViaTest.cpp:388
void add(Func) override
Definition: ViaTest.cpp:198
std::shared_ptr< ManualWaiter > waiter
Definition: ViaTest.cpp:70
double val
Definition: String.cpp:273
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
ViaFixture()
Definition: ViaTest.cpp:45
uint8_t getNumPriorities() const override
Definition: ViaTest.cpp:216
std::shared_ptr< ManualExecutor > ex
Definition: ViaTest.cpp:41
void doWork(int work)
void drive() override
Definition: ViaTest.cpp:36
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:170
std::thread th
Definition: ViaTest.cpp:73
Simple executor that does work in another thread.
Definition: ViaTest.cpp:314
Function< void()> Func
Definition: Executor.h:27
folly::MPMCQueue< Func > funcs
Definition: ViaTest.cpp:315
auto const foo
Definition: LazyTest.cpp:49
LogLevel min
Definition: LogLevel.cpp:30
ThreadExecutor(size_t n=1024)
Definition: ViaTest.cpp:332
static Future< std::string > doWorkStatic(Try< std::string > &&t)
Definition: ViaTest.cpp:95
ManualWaiter(std::shared_ptr< ManualExecutor > ex_)
Definition: ViaTest.cpp:30
void addWithPriority(Func f, int8_t priority) override
Definition: ViaTest.cpp:200
Future< T > getFuture()
Definition: Promise-inl.h:97
Future< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAll(Fs &&...fs)
Definition: Future-inl.h:1477
char a
std::shared_ptr< ManualExecutor > westExecutor
Definition: ViaTest.cpp:68
Definition: Try.h:51
void post() noexcept
Definition: Baton.h:123
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
int * count
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void waitForStartup()
Definition: ViaTest.cpp:346
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
Definition: Promise.h:326
folly::Baton baton
Definition: ViaTest.cpp:318
std::thread worker
Definition: ViaTest.cpp:317
const char * string
Definition: Conv.cpp:212
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
void drive() override
Definition: ViaTest.cpp:389
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
Definition: Future-inl.h:1290
InlineExecutor exe
Definition: Benchmark.cpp:337
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
Definition: Executor.h:200
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static const int8_t HI_PRI
Definition: Executor.h:50
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
void add(Func fn) override
Definition: ViaTest.cpp:342
char c
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void add(Func f) override
Definition: ViaTest.cpp:32
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void addAsync(int a, int b, std::function< void(int &&)> &&cob)
Definition: ViaTest.cpp:64
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
SemiFuture< typename std::decay< T >::type > makeSemiFuture(T &&t)
Definition: Future-inl.h:712
~ViaFixture() override
Definition: ViaTest.cpp:58