proxygen
NewThreadTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <type_traits>
18 
19 #include <chrono>
20 using namespace std::literals;
21 
31 
35 
36 using namespace folly::pushmi::aliases;
37 
38 #include <folly/Conv.h>
39 
42 
43 using namespace testing;
44 
46  explicit countdownsingle(int& c) : counter(&c) {}
47 
48  int* counter;
49 
50  template <class ExecutorRef>
51  void operator()(ExecutorRef exec) {
52  if (--*counter > 0) {
53  exec | op::submit(*this);
54  }
55  }
56 };
57 
58 using NT = decltype(mi::new_thread());
59 
60 inline auto make_time(mi::time_source<>& t, NT& ex) {
61  return t.make(mi::systemNowF{}, [ex]() { return ex; })();
62 }
63 
64 class NewthreadExecutor : public Test {
65  public:
66  ~NewthreadExecutor() override {
67  time_.join();
68  }
69 
70  protected:
72 
73  NT nt_{mi::new_thread()};
75  TNT tnt_{make_time(time_, nt_)};
76 };
77 
78 TEST_F(NewthreadExecutor, BlockingSubmitNow) {
79  auto signals = 0;
80  auto start = v::now(tnt_);
81  auto signaled = start;
82  tnt_ | op::transform([](auto tnt) { return tnt | ep::now(); }) |
84  [&](auto at) {
85  signaled = at;
86  signals += 100;
87  },
88  [&](auto) noexcept { signals += 1000; },
89  [&]() { signals += 10; });
90 
91  EXPECT_THAT(signals, Eq(110))
92  << "expected that the value and done signals are recorded once and the value signal did not drift much";
93  auto delay =
94  std::chrono::duration_cast<std::chrono::milliseconds>((signaled - start))
95  .count();
96  EXPECT_THAT(delay, Lt(1000)) << "The delay is " << delay;
97 }
98 
99 TEST_F(NewthreadExecutor, BlockingGetNow) {
100  auto start = v::now(tnt_);
101  auto signaled = tnt_ | op::transform([](auto tnt) { return v::now(tnt); }) |
102  op::get<std::chrono::system_clock::time_point>;
103 
104  auto delay =
105  std::chrono::duration_cast<std::chrono::milliseconds>((signaled - start))
106  .count();
107 
108  EXPECT_THAT(delay, Lt(1000)) << "The delay is " << delay;
109 }
110 
111 TEST_F(NewthreadExecutor, SubmissionsAreOrderedInTime) {
112  std::vector<std::string> times;
113  std::atomic<int> pushed{0};
114  auto push = [&](int time) {
115  return v::on_value([&, time](auto) {
116  times.push_back(folly::to<std::string>(time));
117  ++pushed;
118  });
119  };
120  tnt_ | op::submit(v::on_value([push](auto tnt) {
121  auto now = tnt | ep::now();
122  tnt | op::submit_after(40ms, push(40)) |
123  op::submit_at(now + 10ms, push(10)) | op::submit_after(20ms, push(20)) |
124  op::submit_at(now + 10ms, push(11));
125  }));
126 
127  while (pushed.load() < 4) {
129  }
130 
131  EXPECT_THAT(times, ElementsAre("10", "11", "20", "40"))
132  << "expected that the items were pushed in time order not insertion order";
133 }
134 
135 TEST_F(NewthreadExecutor, NowIsCalled) {
136  bool done = false;
137  tnt_ | ep::now();
138  tnt_ | op::blocking_submit([&](auto tnt) {
139  tnt | ep::now();
140  done = true;
141  });
142 
143  EXPECT_THAT(done, Eq(true)) << "exptected that both calls to now() complete";
144 }
145 
146 TEST_F(NewthreadExecutor, BlockingSubmit) {
147  auto signals = 0;
148  nt_ | op::transform([](auto) { return 42; }) |
150  [&](auto) { signals += 100; },
151  [&](auto) noexcept { signals += 1000; },
152  [&]() { signals += 10; });
153 
154  EXPECT_THAT(signals, Eq(110))
155  << "the value and done signals are recorded once";
156 }
157 
158 TEST_F(NewthreadExecutor, BlockingGet) {
159  auto v = nt_ | op::transform([](auto) { return 42; }) | op::get<int>;
160 
161  EXPECT_THAT(v, Eq(42)) << "expected that the result would be different";
162 }
163 
164 TEST_F(NewthreadExecutor, VirtualDerecursion) {
165  int counter = 100'000;
166  std::function<void(::folly::pushmi::any_executor_ref<> exec)> recurse;
167  recurse = [&](::folly::pushmi::any_executor_ref<> nt) {
168  if (--counter <= 0)
169  return;
170  nt | op::submit(recurse);
171  };
172  nt_ | op::blocking_submit([&](auto nt) { recurse(nt); });
173 
174  EXPECT_THAT(counter, Eq(0))
175  << "expected that all nested submissions complete";
176 }
177 
178 TEST_F(NewthreadExecutor, StaticDerecursion) {
179  int counter = 100'000;
180  countdownsingle single{counter};
182 
183  EXPECT_THAT(counter, Eq(0))
184  << "expected that all nested submissions complete";
185 }
186 
187 TEST_F(NewthreadExecutor, UsedWithOn) {
188  std::vector<std::string> values;
189  auto sender = ::folly::pushmi::make_single_sender([](auto out) {
190  ::folly::pushmi::set_value(out, 2.0);
192  // ignored
196  });
197  sender | op::on([&]() { return nt_; }) |
199  [&](auto v) { values.push_back(folly::to<std::string>(v)); }));
200 
201  EXPECT_THAT(values, ElementsAre(folly::to<std::string>(2.0)))
202  << "expected that only the first item was pushed";
203 }
204 
205 TEST_F(NewthreadExecutor, UsedWithVia) {
206  std::vector<std::string> values;
207  auto sender = ::folly::pushmi::make_single_sender([](auto out) {
208  ::folly::pushmi::set_value(out, 2.0);
210  // ignored
214  });
215  sender | op::via(mi::strands(nt_)) |
217  [&](auto v) { values.push_back(folly::to<std::string>(v)); }));
218 
219  EXPECT_THAT(values, ElementsAre(folly::to<std::string>(2.0)))
220  << "expected that only the first item was pushed";
221 }
TEST_F(NewthreadExecutor, BlockingSubmitNow)
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
auto on_value(Fns...fns) -> on_value_fn< Fns... >
Definition: boosters.h:262
LogLevel max
Definition: LogLevel.cpp:31
internal::EqMatcher< T > Eq(T x)
void operator()(ExecutorRef exec)
decltype(nt) NT
std::chrono::steady_clock::time_point now()
requires E e noexcept(noexcept(s.error(std::move(e))))
auto make_time(mi::time_source<> &t, NT &ex)
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
~NewthreadExecutor() override
PUSHMI_INLINE_VAR constexpr detail::via_fn via
Definition: via.h:166
internal::LtMatcher< Rhs > Lt(Rhs x)
PUSHMI_INLINE_VAR constexpr detail::blocking_submit_fn blocking_submit
Definition: submit.h:389
LogLevel min
Definition: LogLevel.cpp:30
new_thread_executor new_thread()
Definition: new_thread.h:50
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
auto start
countdownsingle single
#define EXPECT_THAT(value, matcher)
std::atomic< int > counter
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
countdownsingle(int &c)
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
Future< Unit > times(const int n, F &&thunk)
Definition: Future-inl.h:2348
PUSHMI_INLINE_VAR constexpr detail::submit_after_fn submit_after
Definition: submit.h:388
char c
internal::ElementsAreMatcher< ::testing::tuple<> > ElementsAre()
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
Definition: time_source.h:545
PUSHMI_INLINE_VAR constexpr detail::on_fn on
Definition: on.h:100
std::chrono::nanoseconds time()
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_single_sender_fn make_single_sender
std::vector< int > values(1'000)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done