proxygen
BarrierTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/futures/Barrier.h>
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <mutex>
22 
23 #include <folly/Random.h>
25 
26 #include <glog/logging.h>
27 
28 namespace folly {
29 namespace futures {
30 namespace test {
31 
32 TEST(BarrierTest, Simple) {
33  constexpr uint32_t numThreads = 10;
34 
36  std::condition_variable b1DoneCond;
37  std::condition_variable b2DoneCond;
38  std::atomic<uint32_t> b1TrueSeen(0);
39  std::atomic<uint32_t> b1Passed(0);
40  std::atomic<uint32_t> b2TrueSeen(0);
41  std::atomic<uint32_t> b2Passed(0);
42 
43  Barrier barrier(numThreads + 1);
44 
45  std::vector<std::thread> threads;
46  threads.reserve(numThreads);
47  for (uint32_t i = 0; i < numThreads; ++i) {
48  threads.emplace_back([&]() {
49  barrier.wait()
50  .then([&](bool v) {
51  std::unique_lock<std::mutex> lock(mutex);
52  b1TrueSeen += uint32_t(v);
53  if (++b1Passed == numThreads) {
54  b1DoneCond.notify_one();
55  }
56  return barrier.wait();
57  })
58  .then([&](bool v) {
59  std::unique_lock<std::mutex> lock(mutex);
60  b2TrueSeen += uint32_t(v);
61  if (++b2Passed == numThreads) {
62  b2DoneCond.notify_one();
63  }
64  })
65  .get();
66  });
67  }
68 
69  /* sleep override */
70  std::this_thread::sleep_for(std::chrono::milliseconds(50));
71  EXPECT_EQ(0, b1Passed);
72  EXPECT_EQ(0, b1TrueSeen);
73 
74  b1TrueSeen += barrier.wait().get();
75 
76  {
77  std::unique_lock<std::mutex> lock(mutex);
78  while (b1Passed != numThreads) {
79  b1DoneCond.wait(lock);
80  }
81  EXPECT_EQ(1, b1TrueSeen);
82  }
83 
84  /* sleep override */
85  std::this_thread::sleep_for(std::chrono::milliseconds(50));
86  EXPECT_EQ(0, b2Passed);
87  EXPECT_EQ(0, b2TrueSeen);
88 
89  b2TrueSeen += barrier.wait().get();
90 
91  {
92  std::unique_lock<std::mutex> lock(mutex);
93  while (b2Passed != numThreads) {
94  b2DoneCond.wait(lock);
95  }
96  EXPECT_EQ(1, b2TrueSeen);
97  }
98 
99  for (auto& t : threads) {
100  t.join();
101  }
102 }
103 
104 TEST(BarrierTest, Random) {
105  // Create numThreads threads.
106  //
107  // Each thread repeats the following numIterations times:
108  // - grab a randomly chosen number of futures from the barrier, waiting
109  // for a short random time between each
110  // - wait for all futures to complete
111  // - record whether the one future returning true was seen among them
112  //
113  // At the end, we verify that exactly one future returning true was seen
114  // for each iteration.
115  static constexpr uint32_t numIterations = 1;
116  auto numThreads = folly::Random::rand32(30, 91);
117 
118  struct ThreadInfo {
119  ThreadInfo() {}
120  std::thread thread;
121  uint32_t iteration = 0;
122  uint32_t numFutures;
123  std::vector<uint32_t> trueSeen;
124  };
125 
126  std::vector<ThreadInfo> threads;
127  threads.resize(numThreads);
128 
129  uint32_t totalFutures = 0;
130  for (auto& tinfo : threads) {
131  tinfo.numFutures = folly::Random::rand32(100);
132  tinfo.trueSeen.resize(numIterations);
133  totalFutures += tinfo.numFutures;
134  }
135 
136  Barrier barrier(totalFutures);
137 
138  for (auto& tinfo : threads) {
139  auto pinfo = &tinfo;
140  tinfo.thread = std::thread([pinfo, &barrier] {
141  std::vector<folly::Future<bool>> futures;
142  futures.reserve(pinfo->numFutures);
143  for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
144  futures.clear();
145  for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
146  futures.push_back(barrier.wait());
147  auto nanos = folly::Random::rand32(10 * 1000 * 1000);
148  /* sleep override */
149  std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
150  }
151  auto results = folly::collect(futures).get();
152  pinfo->trueSeen[i] = std::count(results.begin(), results.end(), true);
153  }
154  });
155  }
156 
157  for (auto& tinfo : threads) {
158  tinfo.thread.join();
159  EXPECT_EQ(numIterations, tinfo.iteration);
160  }
161 
162  for (uint32_t i = 0; i < numIterations; ++i) {
163  uint32_t trueCount = 0;
164  for (auto& tinfo : threads) {
165  trueCount += tinfo.trueSeen[i];
166  }
167  EXPECT_EQ(1, trueCount);
168  }
169 }
170 
171 } // namespace test
172 } // namespace futures
173 } // namespace folly
folly::Future< bool > wait()
Definition: Barrier.cpp:72
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::vector< std::thread::id > threads
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
int * count
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
Definition: Future-inl.h:1536
std::mutex mutex
TEST(BarrierTest, Simple)
Definition: BarrierTest.cpp:32
static uint32_t rand32()
Definition: Random.h:213