proxygen
MPMCPipelineTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/MPMCPipeline.h>
18 
19 #include <thread>
20 #include <vector>
21 
22 #include <glog/logging.h>
23 
24 #include <folly/Conv.h>
26 
27 namespace folly {
28 namespace test {
29 
30 TEST(MPMCPipeline, Trivial) {
32  EXPECT_EQ(0, a.sizeGuess());
33  a.blockingWrite(42);
34  EXPECT_EQ(1, a.sizeGuess());
35 
36  int val;
37  auto ticket = a.blockingReadStage<0>(val);
38  EXPECT_EQ(42, val);
39  EXPECT_EQ(1, a.sizeGuess());
40 
41  a.blockingWriteStage<0>(ticket, "hello world");
42  EXPECT_EQ(1, a.sizeGuess());
43 
44  std::string s;
45 
46  a.blockingRead(s);
47  EXPECT_EQ("hello world", s);
48  EXPECT_EQ(0, a.sizeGuess());
49 }
50 
51 TEST(MPMCPipeline, TrivialAmplification) {
53  EXPECT_EQ(0, a.sizeGuess());
54  a.blockingWrite(42);
55  EXPECT_EQ(2, a.sizeGuess());
56 
57  int val;
58  auto ticket = a.blockingReadStage<0>(val);
59  EXPECT_EQ(42, val);
60  EXPECT_EQ(2, a.sizeGuess());
61 
62  a.blockingWriteStage<0>(ticket, "hello world");
63  EXPECT_EQ(2, a.sizeGuess());
64  a.blockingWriteStage<0>(ticket, "goodbye");
65  EXPECT_EQ(2, a.sizeGuess());
66 
67  std::string s;
68 
69  a.blockingRead(s);
70  EXPECT_EQ("hello world", s);
71  EXPECT_EQ(1, a.sizeGuess());
72 
73  a.blockingRead(s);
74  EXPECT_EQ("goodbye", s);
75  EXPECT_EQ(0, a.sizeGuess());
76 }
77 
78 TEST(MPMCPipeline, MultiThreaded) {
79  constexpr size_t numThreadsPerStage = 6;
81 
82  std::vector<std::thread> threads;
83  threads.reserve(numThreadsPerStage * 2 + 1);
84  for (size_t i = 0; i < numThreadsPerStage; ++i) {
85  threads.emplace_back([&a] {
86  for (;;) {
87  int val;
88  auto ticket = a.blockingReadStage<0>(val);
89  if (val == -1) { // stop
90  // We still need to propagate
91  a.blockingWriteStage<0>(ticket, "");
92  break;
93  }
94  a.blockingWriteStage<0>(ticket, folly::to<std::string>(val, " hello"));
95  }
96  });
97  }
98 
99  for (size_t i = 0; i < numThreadsPerStage; ++i) {
100  threads.emplace_back([&a] {
101  for (;;) {
103  auto ticket = a.blockingReadStage<1>(val);
104  if (val.empty()) { // stop
105  // We still need to propagate
106  a.blockingWriteStage<1>(ticket, "");
107  break;
108  }
109  a.blockingWriteStage<1>(ticket, folly::to<std::string>(val, " world"));
110  }
111  });
112  }
113 
114  std::vector<std::string> results;
115  threads.emplace_back([&a, &results]() {
116  for (;;) {
118  a.blockingRead(val);
119  if (val.empty()) {
120  break;
121  }
122  results.push_back(val);
123  }
124  });
125 
126  constexpr size_t numValues = 1000;
127  for (size_t i = 0; i < numValues; ++i) {
128  a.blockingWrite(i);
129  }
130  for (size_t i = 0; i < numThreadsPerStage; ++i) {
131  a.blockingWrite(-1);
132  }
133 
134  for (auto& t : threads) {
135  t.join();
136  }
137 
138  // The consumer thread dequeued the first empty string, there should be
139  // numThreadsPerStage - 1 left.
140  EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess());
141  for (size_t i = 0; i < numThreadsPerStage - 1; ++i) {
143  a.blockingRead(val);
144  EXPECT_TRUE(val.empty());
145  }
146  {
147  std::string tmp;
148  EXPECT_FALSE(a.read(tmp));
149  }
150  EXPECT_EQ(0, a.sizeGuess());
151 
152  EXPECT_EQ(numValues, results.size());
153  for (size_t i = 0; i < results.size(); ++i) {
154  EXPECT_EQ(folly::to<std::string>(i, " hello world"), results[i]);
155  }
156 }
157 
158 } // namespace test
159 } // namespace folly
160 
161 int main(int argc, char* argv[]) {
162  testing::InitGoogleTest(&argc, argv);
163  gflags::ParseCommandLineFlags(&argc, &argv, true);
164  return RUN_ALL_TESTS();
165 }
ssize_t sizeGuess() const noexcept
Definition: MPMCPipeline.h:272
void blockingWrite(Args &&...args)
Definition: MPMCPipeline.h:194
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: gtest.h:2232
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void blockingRead(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:251
static constexpr StringPiece ticket
std::vector< std::thread::id > threads
char ** argv
char a
TEST(ProgramOptionsTest, Errors)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
bool read(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:260
const char * string
Definition: Conv.cpp:212
static set< string > s
Ticket< Stage > blockingReadStage(typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:211
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: gtest.cc:5370
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void blockingWriteStage(Ticket< Stage > &ticket, Args &&...args)
Definition: MPMCPipeline.h:243
int main(int argc, char *argv[])