27 using namespace folly;
30 explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex_) : ex(ex_) {}
41 std::shared_ptr<ManualExecutor>
ex;
50 th = std::thread([=] {
60 eastExecutor->add([=]() {});
64 void addAsync(
int a,
int b, std::function<
void(
int&&)>&& cob) {
65 eastExecutor->add([=]() { cob(a + b); });
70 std::shared_ptr<ManualWaiter>
waiter;
76 TEST(Via, exceptionOnLaunch) {
77 auto future = makeFuture<int>(std::runtime_error(
"E"));
83 return t.value() == 1;
114 EXPECT_EQ(
f.value(),
"start;static;class-static;class");
118 auto westThreadId = std::this_thread::get_id();
119 auto f =
via(eastExecutor.get())
121 EXPECT_NE(std::this_thread::get_id(), westThreadId);
122 return makeFuture<int>(1);
124 .
via(westExecutor.get())
126 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
133 auto westThreadId = std::this_thread::get_id();
134 auto f =
via(eastExecutor.get())
135 .thenValue([=](
auto&&) {
136 EXPECT_NE(std::this_thread::get_id(), westThreadId);
141 .via(westExecutor.get())
142 .then([=](
int v)
mutable {
143 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
151 EXPECT_NE(std::this_thread::get_id(), westThreadId);
154 .
via(westExecutor.get())
157 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
165 auto f =
via(eastExecutor.get());
171 auto f2 =
f.via(eastExecutor.get());
201 int mid = getNumPriorities() / 2;
202 int p = priority < 0 ?
std::max(0, mid + priority)
203 :
std::min(getNumPriorities() - 1, mid + priority);
227 via(&exe, -1).thenValue([](
auto&&) {});
228 via(&exe, 0).thenValue([](
auto&&) {});
229 via(&exe, 1).thenValue([](
auto&&) {});
230 via(&exe, 42).thenValue([](
auto&&) {});
231 via(&exe, -42).thenValue(
233 via(&exe).thenValue([](
auto&&) {});
245 .thenMultiWithExecutor(eastExecutor.get(), [] {
return 42; })
250 auto westThreadId = std::this_thread::get_id();
252 auto f =
via(westExecutor.get())
253 .thenMultiWithExecutor(
256 EXPECT_NE(std::this_thread::get_id(), westThreadId);
265 .thenValue([&](
auto&&) {
266 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
275 bool a =
false,
b =
false,
c =
false;
277 .thenValue([&](
auto&&) { a =
true; })
278 .then(&x2, [&](
auto&&) {
b =
true; })
279 .thenValue([&](
auto&&) {
c =
true; });
312 #ifndef __APPLE__ // TODO #7372389 316 std::atomic<bool> done{
false};
324 while (!funcs.isEmpty()) {
325 funcs.blockingRead(fn);
353 std::unique_ptr<int>
val =
355 .thenValue([](
auto&&) {
return std::make_unique<int>(42); })
365 auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
366 std::vector<Future<Unit>>
futures;
368 for (
auto& p : *promises) {
369 futures.emplace_back(p.getFuture().via(&x).then([](
Try<Unit>&&) {}));
374 for (
auto& p : *promises) {
399 auto f =
via(&x).thenValue([](
auto&&) {
return true; });
406 auto f =
via(&x).then();
429 auto f =
via(&x).thenValue([](
auto&&) {
return 23; });
437 auto f =
via(&x).then();
439 auto t =
f.getTryVia(&x);
451 TEST(Via, SimpleTimedGetTryVia) {
461 auto f =
via(&x).then();
470 auto f =
via(&x).then().waitVia(&x);
484 auto tid = std::this_thread::get_id();
492 .then([&](
Try<Unit>&&) { done =
true; });
495 std::thread t2([&] { p.
setValue(); });
504 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
510 auto captured_promise_future = captured_promise.
getFuture();
514 [
c =
std::move(captured_promise)](
auto&&) {
return 42; });
518 std::move(captured_promise_future).
get(std::chrono::seconds(5)),
522 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
528 auto captured_promise_future = captured_promise.
getFuture();
532 auto future = trigger.
getFuture().via(&x).thenValue(
533 [
c =
std::move(captured_promise)](
auto&&) {
return 42; });
538 std::move(captured_promise_future).
get(std::chrono::seconds(5)),
542 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
549 auto captured_promise_future = captured_promise.
getFuture();
555 [
c =
std::move(captured_promise)](
auto&&) {
return 42; });
561 std::move(captured_promise_future).
get(std::chrono::seconds(5)),
565 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
572 auto captured_promise_future = captured_promise.
getFuture();
578 future = trigger.
getFuture().via(&x).thenValue(
579 [
c =
std::move(captured_promise)](
auto&&) {
return 42; });
586 std::move(captured_promise_future).
get(std::chrono::seconds(5)),
608 via(&x, []() ->
int {
throw std::runtime_error(
"expected"); }).getVia(&x),
625 via(&x, [&] { count++; }).getVia(&x);
633 auto f =
via(&x, [&] { count++; });
636 std::move(
f).thenValue([&](
auto&&) { count++; });
644 auto intp = std::make_unique<int>(42);
649 TEST(ViaFunc, valueKeepAlive) {
654 TEST(ViaFunc, thenValueKeepAlive) {
659 .thenValue([](
auto&&) {
return 42; })
std::shared_ptr< ManualExecutor > eastExecutor
#define EXPECT_THROW(statement, expected_exception)
InlineExecutor inlineExecutor
~ThreadExecutor() override
#define EXPECT_EQ(val1, val2)
static const int8_t LO_PRI
constexpr detail::Map< Move > move
std::shared_ptr< ManualWaiter > waiter
—— Concurrent Priority Queue Implementation ——
#define EXPECT_GE(val1, val2)
uint8_t getNumPriorities() const override
std::shared_ptr< ManualExecutor > ex
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
Simple executor that does work in another thread.
folly::MPMCQueue< Func > funcs
ThreadExecutor(size_t n=1024)
static Future< std::string > doWorkStatic(Try< std::string > &&t)
ManualWaiter(std::shared_ptr< ManualExecutor > ex_)
void addWithPriority(Func f, int8_t priority) override
Future< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAll(Fs &&...fs)
std::shared_ptr< ManualExecutor > westExecutor
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
#define EXPECT_TRUE(condition)
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
#define EXPECT_NE(val1, val2)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
#define EXPECT_FALSE(condition)
static const int8_t HI_PRI
#define EXPECT_LT(val1, val2)
void add(Func fn) override
#define ASSERT_TRUE(condition)
void add(Func f) override
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void addAsync(int a, int b, std::function< void(int &&)> &&cob)
Future< typename std::decay< T >::type > makeFuture(T &&t)
SemiFuture< typename std::decay< T >::type > makeSemiFuture(T &&t)