17 #include <boost/thread/barrier.hpp> 26 using namespace folly;
33 auto fn = [](std::vector<int> input,
size_t window_size,
size_t expect) {
44 std::vector<int> input = {1, 2, 3};
49 std::vector<int> input = {1, 2, 3};
54 std::vector<int> input;
61 std::vector<int>({1, 2, 3}),
76 std::vector<std::string>{
"1",
"2",
"3"},
78 return makeFuture<int>(folly::to<int>(
s));
90 std::vector<std::string>{
"1",
"2",
"3"},
92 return makeSemiFuture<int>(folly::to<int>(
s));
116 std::vector<int> ints = {1, 2, 3, 4};
117 std::vector<Promise<int>> ps(4);
124 throw std::runtime_error(
"exception should not kill process");
126 return ps[
i].getFuture();
145 static constexpr
size_t m = 1000;
147 static constexpr
size_t n = 1000;
149 std::vector<std::array<int, n>> ints;
151 for (
size_t i = 0;
i <
m;
i++) {
152 std::array<int, n>
next{};
154 ints.emplace_back(
next);
162 [](std::array<int, n>
i) {
166 static_cast<int64_t>(0),
168 for (
int a :
b.value()) {
178 std::vector<int> input;
179 std::vector<Promise<int>> ps(10);
180 for (
size_t i = 0;
i < ps.size();
i++) {
181 input.emplace_back(
i);
183 auto f =
collect(
window(input, [&](
int i) {
return ps[
i].getFuture(); }, 3));
185 std::vector<std::thread> ts;
186 boost::barrier barrier(ps.size() + 1);
187 for (
size_t i = 0; i < ps.size(); i++) {
188 ts.emplace_back([&ps, &barrier, i]() {
201 for (
size_t i = 0; i < ps.size(); i++) {
206 TEST(Window, parallelWithError) {
207 std::vector<int> input;
208 std::vector<Promise<int>> ps(10);
209 for (
size_t i = 0;
i < ps.size();
i++) {
210 input.emplace_back(
i);
212 auto f =
collect(
window(input, [&](
int i) {
return ps[
i].getFuture(); }, 3));
214 std::vector<std::thread> ts;
215 boost::barrier barrier(ps.size() + 1);
216 for (
size_t i = 0; i < ps.size(); i++) {
217 ts.emplace_back([&ps, &barrier, i]() {
219 if (i == (ps.size() / 2)) {
220 ps[
i].setException(
eggs);
237 TEST(Window, allParallelWithError) {
238 std::vector<int> input;
239 std::vector<Promise<int>> ps(10);
240 for (
size_t i = 0;
i < ps.size();
i++) {
241 input.emplace_back(
i);
246 std::vector<std::thread> ts;
247 boost::barrier barrier(ps.size() + 1);
248 for (
size_t i = 0; i < ps.size(); i++) {
249 ts.emplace_back([&ps, &barrier, i]() {
251 if (i == (ps.size() / 2)) {
252 ps[
i].setException(
eggs);
266 for (
size_t i = 0; i < ps.size(); i++) {
267 if (i == (ps.size() / 2)) {
281 std::vector<int> input,
size_t window_size,
size_t expect) {
284 executor_, input, [](
int i) {
return makeFuture(i); }, window_size),
287 executor_->waitFor(res);
292 std::vector<int> input = {1, 2, 3};
297 std::vector<int> input = {1, 2, 3};
302 std::vector<int> input;
310 std::vector<int>({1, 2, 3}),
326 std::vector<std::string>{
"1",
"2",
"3"},
327 [](
std::string s) {
return makeFuture<int>(folly::to<int>(
s)); },
339 std::vector<int> input;
340 std::vector<Promise<int>> ps(10);
341 for (
size_t i = 0;
i < ps.size();
i++) {
342 input.emplace_back(
i);
345 window(&executor, input, [&](
int i) {
return ps[
i].getFuture(); }, 3));
347 std::vector<std::thread> ts;
348 boost::barrier barrier(ps.size() + 1);
349 for (
size_t i = 0; i < ps.size(); i++) {
350 ts.emplace_back([&ps, &barrier, i]() {
364 for (
size_t i = 0; i < ps.size(); i++) {
369 TEST(WindowExecutor, parallelWithError) {
372 std::vector<int> input;
373 std::vector<Promise<int>> ps(10);
374 for (
size_t i = 0;
i < ps.size();
i++) {
375 input.emplace_back(
i);
378 window(&executor, input, [&](
int i) {
return ps[
i].getFuture(); }, 3));
380 std::vector<std::thread> ts;
381 boost::barrier barrier(ps.size() + 1);
382 for (
size_t i = 0; i < ps.size(); i++) {
383 ts.emplace_back([&ps, &barrier, i]() {
385 if (i == (ps.size() / 2)) {
386 ps[
i].setException(
eggs);
404 TEST(WindowExecutor, allParallelWithError) {
407 std::vector<int> input;
408 std::vector<Promise<int>> ps(10);
409 for (
size_t i = 0;
i < ps.size();
i++) {
410 input.emplace_back(
i);
413 window(&executor, input, [&](
int i) {
return ps[
i].getFuture(); }, 3));
415 std::vector<std::thread> ts;
416 boost::barrier barrier(ps.size() + 1);
417 for (
size_t i = 0; i < ps.size(); i++) {
418 ts.emplace_back([&ps, &barrier, i]() {
420 if (i == (ps.size() / 2)) {
421 ps[
i].setException(
eggs);
436 for (
size_t i = 0; i < ps.size(); i++) {
437 if (i == (ps.size() / 2)) {
std::atomic< int64_t > sum(0)
#define EXPECT_THROW(statement, expected_exception)
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
void waitFor(F const &f)
makeProgress until this Future is ready.
#define SCOPED_TRACE(message)
—— Concurrent Priority Queue Implementation ——
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
std::vector< Future< Result > > window(Collection input, F func, size_t n)
bool hasException() const
Future< T > reduce(It first, It last, T &&initial, F &&func)
static map< string, int > m
Future< std::tuple< Try< typename remove_cvref_t< Fs >::value_type >... > > collectAll(Fs &&...fs)
Parallel parallel(Ops ops, size_t threads=0)
void expect(LineReader &lr, const char *expected)
static eggs_t eggs("eggs")
#define EXPECT_TRUE(condition)
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
Future< typename std::decay< T >::type > makeFuture(T &&t)