proxygen
WindowTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <boost/thread/barrier.hpp>
18 
19 #include <folly/Conv.h>
21 #include <folly/futures/Future.h>
23 
24 #include <vector>
25 
26 using namespace folly;
27 
29 static eggs_t eggs("eggs");
30 
31 TEST(Window, basic) {
32  // int -> Future<int>
33  auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
34  auto res =
35  reduce(
36  window(input, [](int i) { return makeFuture(i); }, window_size),
37  0,
38  [](int sum, const Try<int>& b) { return sum + *b; })
39  .get();
40  EXPECT_EQ(expect, res);
41  };
42  {
43  SCOPED_TRACE("2 in-flight at a time");
44  std::vector<int> input = {1, 2, 3};
45  fn(input, 2, 6);
46  }
47  {
48  SCOPED_TRACE("4 in-flight at a time");
49  std::vector<int> input = {1, 2, 3};
50  fn(input, 4, 6);
51  }
52  {
53  SCOPED_TRACE("empty input");
54  std::vector<int> input;
55  fn(input, 1, 0);
56  }
57  {
58  // int -> Future<Unit>
59  auto res = reduce(
60  window(
61  std::vector<int>({1, 2, 3}),
62  [](int /* i */) { return makeFuture(); },
63  2),
64  0,
65  [](int sum, const Try<Unit>& b) {
66  EXPECT_TRUE(b.hasValue());
67  return sum + 1;
68  })
69  .get();
70  EXPECT_EQ(3, res);
71  }
72  {
73  // string -> return Future<int>
74  auto res = reduce(
75  window(
76  std::vector<std::string>{"1", "2", "3"},
77  [](std::string s) {
78  return makeFuture<int>(folly::to<int>(s));
79  },
80  2),
81  0,
82  [](int sum, const Try<int>& b) { return sum + *b; })
83  .get();
84  EXPECT_EQ(6, res);
85  }
86  {
87  // string -> return SemiFuture<int>
88  auto res = reduce(
89  window(
90  std::vector<std::string>{"1", "2", "3"},
91  [](std::string s) {
92  return makeSemiFuture<int>(folly::to<int>(s));
93  },
94  2),
95  0,
96  [](int sum, const Try<int>& b) { return sum + *b; })
97  .get();
98  EXPECT_EQ(6, res);
99  }
100  {
101  SCOPED_TRACE("repeat same fn");
102  auto res =
103  reduce(
104  window(
105  5UL,
106  [](size_t iteration) { return folly::makeFuture(iteration); },
107  2),
108  0UL,
109  [](size_t sum, const Try<size_t>& b) { return sum + b.value(); })
110  .get();
111  EXPECT_EQ(0 + 1 + 2 + 3 + 4, res);
112  }
113 }
114 
115 TEST(Window, exception) {
116  std::vector<int> ints = {1, 2, 3, 4};
117  std::vector<Promise<int>> ps(4);
118 
119  auto res = reduce(
120  window(
121  ints,
122  [&ps](int i) {
123  if (i > 2) {
124  throw std::runtime_error("exception should not kill process");
125  }
126  return ps[i].getFuture();
127  },
128  2),
129  0,
130  [](int sum, const Try<int>& b) {
131  sum += b.hasException<std::exception>() ? 1 : 0;
132  return sum;
133  });
134 
135  for (auto& p : ps) {
136  p.setValue(0);
137  }
138 
139  // Should have received 2 exceptions.
140  EXPECT_EQ(2, std::move(res).get());
141 }
142 
143 TEST(Window, stackOverflow) {
144  // Number of futures to spawn.
145  static constexpr size_t m = 1000;
146  // Size of each block of input and output.
147  static constexpr size_t n = 1000;
148 
149  std::vector<std::array<int, n>> ints;
150  int64_t expectedSum = 0;
151  for (size_t i = 0; i < m; i++) {
152  std::array<int, n> next{};
153  next[i % n] = i;
154  ints.emplace_back(next);
155  expectedSum += i;
156  }
157 
158  // Try to overflow window's executor.
159  auto res = reduce(
160  window(
161  ints,
162  [](std::array<int, n> i) {
164  },
165  1),
166  static_cast<int64_t>(0),
167  [](int64_t sum, const Try<std::array<int, n>>& b) {
168  for (int a : b.value()) {
169  sum += a;
170  }
171  return sum;
172  });
173 
174  EXPECT_EQ(std::move(res).get(), expectedSum);
175 }
176 
177 TEST(Window, parallel) {
178  std::vector<int> input;
179  std::vector<Promise<int>> ps(10);
180  for (size_t i = 0; i < ps.size(); i++) {
181  input.emplace_back(i);
182  }
183  auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
184 
185  std::vector<std::thread> ts;
186  boost::barrier barrier(ps.size() + 1);
187  for (size_t i = 0; i < ps.size(); i++) {
188  ts.emplace_back([&ps, &barrier, i]() {
189  barrier.wait();
190  ps[i].setValue(i);
191  });
192  }
193 
194  barrier.wait();
195 
196  for (auto& t : ts) {
197  t.join();
198  }
199 
200  EXPECT_TRUE(f.isReady());
201  for (size_t i = 0; i < ps.size(); i++) {
202  EXPECT_EQ(i, f.value()[i]);
203  }
204 }
205 
206 TEST(Window, parallelWithError) {
207  std::vector<int> input;
208  std::vector<Promise<int>> ps(10);
209  for (size_t i = 0; i < ps.size(); i++) {
210  input.emplace_back(i);
211  }
212  auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
213 
214  std::vector<std::thread> ts;
215  boost::barrier barrier(ps.size() + 1);
216  for (size_t i = 0; i < ps.size(); i++) {
217  ts.emplace_back([&ps, &barrier, i]() {
218  barrier.wait();
219  if (i == (ps.size() / 2)) {
220  ps[i].setException(eggs);
221  } else {
222  ps[i].setValue(i);
223  }
224  });
225  }
226 
227  barrier.wait();
228 
229  for (auto& t : ts) {
230  t.join();
231  }
232 
233  EXPECT_TRUE(f.isReady());
234  EXPECT_THROW(f.value(), eggs_t);
235 }
236 
237 TEST(Window, allParallelWithError) {
238  std::vector<int> input;
239  std::vector<Promise<int>> ps(10);
240  for (size_t i = 0; i < ps.size(); i++) {
241  input.emplace_back(i);
242  }
243  auto f =
244  collectAll(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
245 
246  std::vector<std::thread> ts;
247  boost::barrier barrier(ps.size() + 1);
248  for (size_t i = 0; i < ps.size(); i++) {
249  ts.emplace_back([&ps, &barrier, i]() {
250  barrier.wait();
251  if (i == (ps.size() / 2)) {
252  ps[i].setException(eggs);
253  } else {
254  ps[i].setValue(i);
255  }
256  });
257  }
258 
259  barrier.wait();
260 
261  for (auto& t : ts) {
262  t.join();
263  }
264 
265  EXPECT_TRUE(f.isReady());
266  for (size_t i = 0; i < ps.size(); i++) {
267  if (i == (ps.size() / 2)) {
268  EXPECT_THROW(f.value()[i].value(), eggs_t);
269  } else {
270  EXPECT_TRUE(f.value()[i].hasValue());
271  EXPECT_EQ(i, f.value()[i].value());
272  }
273  }
274 }
275 
276 TEST(WindowExecutor, basic) {
278 
279  // int -> Future<int>
280  auto fn = [executor_ = &executor](
281  std::vector<int> input, size_t window_size, size_t expect) {
282  auto res = reduce(
283  window(
284  executor_, input, [](int i) { return makeFuture(i); }, window_size),
285  0,
286  [](int sum, const Try<int>& b) { return sum + *b; });
287  executor_->waitFor(res);
288  EXPECT_EQ(expect, std::move(res).get());
289  };
290  {
291  SCOPED_TRACE("2 in-flight at a time");
292  std::vector<int> input = {1, 2, 3};
293  fn(input, 2, 6);
294  }
295  {
296  SCOPED_TRACE("4 in-flight at a time");
297  std::vector<int> input = {1, 2, 3};
298  fn(input, 4, 6);
299  }
300  {
301  SCOPED_TRACE("empty input");
302  std::vector<int> input;
303  fn(input, 1, 0);
304  }
305  {
306  // int -> Future<Unit>
307  auto res = reduce(
308  window(
309  &executor,
310  std::vector<int>({1, 2, 3}),
311  [](int /* i */) { return makeFuture(); },
312  2),
313  0,
314  [](int sum, const Try<Unit>& b) {
315  EXPECT_TRUE(b.hasValue());
316  return sum + 1;
317  });
318  executor.waitFor(res);
319  EXPECT_EQ(3, std::move(res).get());
320  }
321  {
322  // string -> return Future<int>
323  auto res = reduce(
324  window(
325  &executor,
326  std::vector<std::string>{"1", "2", "3"},
327  [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
328  2),
329  0,
330  [](int sum, const Try<int>& b) { return sum + *b; });
331  executor.waitFor(res);
332  EXPECT_EQ(6, std::move(res).get());
333  }
334 }
335 
336 TEST(WindowExecutor, parallel) {
338 
339  std::vector<int> input;
340  std::vector<Promise<int>> ps(10);
341  for (size_t i = 0; i < ps.size(); i++) {
342  input.emplace_back(i);
343  }
344  auto f = collect(
345  window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
346 
347  std::vector<std::thread> ts;
348  boost::barrier barrier(ps.size() + 1);
349  for (size_t i = 0; i < ps.size(); i++) {
350  ts.emplace_back([&ps, &barrier, i]() {
351  barrier.wait();
352  ps[i].setValue(i);
353  });
354  }
355 
356  barrier.wait();
357 
358  for (auto& t : ts) {
359  t.join();
360  }
361 
362  executor.waitFor(f);
363  EXPECT_TRUE(f.isReady());
364  for (size_t i = 0; i < ps.size(); i++) {
365  EXPECT_EQ(i, f.value()[i]);
366  }
367 }
368 
369 TEST(WindowExecutor, parallelWithError) {
371 
372  std::vector<int> input;
373  std::vector<Promise<int>> ps(10);
374  for (size_t i = 0; i < ps.size(); i++) {
375  input.emplace_back(i);
376  }
377  auto f = collect(
378  window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
379 
380  std::vector<std::thread> ts;
381  boost::barrier barrier(ps.size() + 1);
382  for (size_t i = 0; i < ps.size(); i++) {
383  ts.emplace_back([&ps, &barrier, i]() {
384  barrier.wait();
385  if (i == (ps.size() / 2)) {
386  ps[i].setException(eggs);
387  } else {
388  ps[i].setValue(i);
389  }
390  });
391  }
392 
393  barrier.wait();
394 
395  for (auto& t : ts) {
396  t.join();
397  }
398 
399  executor.waitFor(f);
400  EXPECT_TRUE(f.isReady());
401  EXPECT_THROW(f.value(), eggs_t);
402 }
403 
404 TEST(WindowExecutor, allParallelWithError) {
406 
407  std::vector<int> input;
408  std::vector<Promise<int>> ps(10);
409  for (size_t i = 0; i < ps.size(); i++) {
410  input.emplace_back(i);
411  }
412  auto f = collectAll(
413  window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
414 
415  std::vector<std::thread> ts;
416  boost::barrier barrier(ps.size() + 1);
417  for (size_t i = 0; i < ps.size(); i++) {
418  ts.emplace_back([&ps, &barrier, i]() {
419  barrier.wait();
420  if (i == (ps.size() / 2)) {
421  ps[i].setException(eggs);
422  } else {
423  ps[i].setValue(i);
424  }
425  });
426  }
427 
428  barrier.wait();
429 
430  for (auto& t : ts) {
431  t.join();
432  }
433 
434  executor.waitFor(f);
435  EXPECT_TRUE(f.isReady());
436  for (size_t i = 0; i < ps.size(); i++) {
437  if (i == (ps.size() / 2)) {
438  EXPECT_THROW(f.value()[i].value(), eggs_t);
439  } else {
440  EXPECT_TRUE(f.value()[i].hasValue());
441  EXPECT_EQ(i, f.value()[i].value());
442  }
443  }
444 }
auto f
std::atomic< int64_t > sum(0)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void waitFor(F const &f)
makeProgress until this Future is ready.
#define SCOPED_TRACE(message)
Definition: gtest.h:2115
static void basic()
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
FutureException eggs_t
Definition: WindowTest.cpp:28
std::vector< Future< Result > > window(Collection input, F func, size_t n)
Definition: Future-inl.h:1789
bool hasException() const
Definition: Try.h:248
Future< T > reduce(It first, It last, T &&initial, F &&func)
Definition: Future-inl.h:1753
static map< string, int > m
Future< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAll(Fs &&...fs)
Definition: Future-inl.h:1477
char a
Parallel parallel(Ops ops, size_t threads=0)
Definition: Parallel.h:88
void expect(LineReader &lr, const char *expected)
Definition: Try.h:51
static eggs_t eggs("eggs")
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
Definition: Future-inl.h:1536
bool hasValue() const
Definition: Try.h:242
const char * string
Definition: Conv.cpp:212
static set< string > s
T & value()&
Definition: Try-inl.h:140
TEST(SequencedExecutor, CPUThreadPoolExecutor)
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
def next(obj)
Definition: ast.py:58