proxygen
PipelineTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <wangle/channel/Handler.h>
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
25 #include <boost/thread/barrier.hpp>
26 
27 using namespace folly;
28 using namespace wangle;
29 using namespace testing;
30 
32 class IntHandler2 : public StrictMock<MockHandlerAdapter<int, int>> {};
33 
34 ACTION(FireRead) {
35  arg0->fireRead(arg1);
36 }
37 
38 ACTION(FireReadEOF) {
39  arg0->fireReadEOF();
40 }
41 
42 ACTION(FireReadException) {
43  arg0->fireReadException(arg1);
44 }
45 
46 ACTION(FireWrite) {
47  arg0->fireWrite(arg1);
48 }
49 
50 ACTION(FireClose) {
51  arg0->fireClose();
52 }
53 
54 // Test move only types, among other things
55 TEST(PipelineTest, RealHandlersCompile) {
56  EventBase eb;
57  auto socket = AsyncSocket::newSocket(&eb);
58  // static
59  {
62  OutputBufferingHandler>::create(
63  AsyncSocketHandler(socket),
65  EXPECT_TRUE(pipeline->getHandler<AsyncSocketHandler>());
66  EXPECT_TRUE(pipeline->getHandler<OutputBufferingHandler>());
67  }
68  // dynamic
69  {
70  auto pipeline = Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>>::create();
71  (*pipeline)
72  .addBack(AsyncSocketHandler(socket))
73  .addBack(OutputBufferingHandler())
74  .finalize();
75  EXPECT_TRUE(pipeline->getHandler<AsyncSocketHandler>());
76  EXPECT_TRUE(pipeline->getHandler<OutputBufferingHandler>());
77  }
78 }
79 
80 // Test that handlers correctly fire the next handler when directed
81 TEST(PipelineTest, FireActions) {
82  IntHandler handler1;
83  IntHandler2 handler2;
84 
85  {
86  InSequence sequence;
87  EXPECT_CALL(handler2, attachPipeline(_));
88  EXPECT_CALL(handler1, attachPipeline(_));
89  }
90 
92  &handler1, &handler2);
93 
94  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
95  EXPECT_CALL(handler2, read_(_, _)).Times(1);
96  pipeline->read(1);
97 
98  EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF());
99  EXPECT_CALL(handler2, readEOF(_)).Times(1);
100  pipeline->readEOF();
101 
102  EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException());
103  EXPECT_CALL(handler2, readException(_, _)).Times(1);
104  pipeline->readException(make_exception_wrapper<std::runtime_error>("blah"));
105 
106  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
107  EXPECT_CALL(handler1, write_(_, _)).Times(1);
108  EXPECT_NO_THROW(pipeline->write(1).value());
109 
110  EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose());
111  EXPECT_CALL(handler1, close_(_)).Times(1);
112  EXPECT_NO_THROW(pipeline->close().value());
113 
114  {
115  InSequence sequence;
116  EXPECT_CALL(handler1, detachPipeline(_));
117  EXPECT_CALL(handler2, detachPipeline(_));
118  }
119 }
120 
121 // Test that nothing bad happens when actions reach the end of the pipeline
122 // (a warning will be logged, however)
123 TEST(PipelineTest, ReachEndOfPipeline) {
125  EXPECT_CALL(handler, attachPipeline(_));
126  auto pipeline = StaticPipeline<int, int, IntHandler>::create(&handler);
127 
128  EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
129  pipeline->read(1);
130 
131  EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF());
132  pipeline->readEOF();
133 
134  EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException());
135  pipeline->readException(make_exception_wrapper<std::runtime_error>("blah"));
136 
137  EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite());
138  EXPECT_NO_THROW(pipeline->write(1).value());
139 
140  EXPECT_CALL(handler, close_(_)).WillOnce(FireClose());
141  EXPECT_NO_THROW(pipeline->close().value());
142 
143  EXPECT_CALL(handler, detachPipeline(_));
144 }
145 
146 // Test having the last read handler turn around and write
147 TEST(PipelineTest, TurnAround) {
148  IntHandler handler1;
149  IntHandler2 handler2;
150 
151  {
152  InSequence sequence;
153  EXPECT_CALL(handler2, attachPipeline(_));
154  EXPECT_CALL(handler1, attachPipeline(_));
155  }
156 
158  &handler1, &handler2);
159 
160  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
161  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite());
162  EXPECT_CALL(handler1, write_(_, _)).Times(1);
163  pipeline->read(1);
164 
165  {
166  InSequence sequence;
167  EXPECT_CALL(handler1, detachPipeline(_));
168  EXPECT_CALL(handler2, detachPipeline(_));
169  }
170 }
171 
172 TEST(PipelineTest, DynamicFireActions) {
173  IntHandler handler1, handler2, handler3;
174  EXPECT_CALL(handler2, attachPipeline(_));
175  auto pipeline = StaticPipeline<int, int, IntHandler>::create(&handler2);
176 
177 
178  {
179  InSequence sequence;
180  EXPECT_CALL(handler3, attachPipeline(_));
181  EXPECT_CALL(handler1, attachPipeline(_));
182  }
183 
184  (*pipeline)
185  .addFront(&handler1)
186  .addBack(&handler3)
187  .finalize();
188 
189  EXPECT_TRUE(pipeline->getHandler<IntHandler>(0));
190  EXPECT_TRUE(pipeline->getHandler<IntHandler>(1));
191  EXPECT_TRUE(pipeline->getHandler<IntHandler>(2));
192 
193  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
194  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
195  EXPECT_CALL(handler3, read_(_, _)).Times(1);
196  pipeline->read(1);
197 
198  EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite());
199  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
200  EXPECT_CALL(handler1, write_(_, _)).Times(1);
201  EXPECT_NO_THROW(pipeline->write(1).value());
202 
203  {
204  InSequence sequence;
205  EXPECT_CALL(handler1, detachPipeline(_));
206  EXPECT_CALL(handler2, detachPipeline(_));
207  EXPECT_CALL(handler3, detachPipeline(_));
208  }
209 }
210 
211 TEST(PipelineTest, DynamicAttachDetachOrder) {
212  IntHandler handler1, handler2;
213  auto pipeline = Pipeline<int, int>::create();
214  {
215  InSequence sequence;
216  EXPECT_CALL(handler2, attachPipeline(_));
217  EXPECT_CALL(handler1, attachPipeline(_));
218  }
219  (*pipeline)
220  .addBack(&handler1)
221  .addBack(&handler2)
222  .finalize();
223  {
224  InSequence sequence;
225  EXPECT_CALL(handler1, detachPipeline(_));
226  EXPECT_CALL(handler2, detachPipeline(_));
227  }
228 }
229 
230 TEST(PipelineTest, GetContext) {
232  EXPECT_CALL(handler, attachPipeline(_));
233  auto pipeline = StaticPipeline<int, int, IntHandler>::create(&handler);
234  EXPECT_TRUE(handler.getContext());
235  EXPECT_CALL(handler, detachPipeline(_));
236 }
237 
238 TEST(PipelineTest, HandlerInMultiplePipelines) {
240  EXPECT_CALL(handler, attachPipeline(_)).Times(2);
241  auto pipeline1 = StaticPipeline<int, int, IntHandler>::create(&handler);
242  auto pipeline2 = StaticPipeline<int, int, IntHandler>::create(&handler);
243  EXPECT_FALSE(handler.getContext());
244  EXPECT_CALL(handler, detachPipeline(_)).Times(2);
245 }
246 
247 TEST(PipelineTest, HandlerInPipelineTwice) {
248  auto handler = std::make_shared<IntHandler>();
249  EXPECT_CALL(*handler, attachPipeline(_)).Times(2);
250  auto pipeline = Pipeline<int, int>::create();
251  pipeline->addBack(handler);
252  pipeline->addBack(handler);
253  pipeline->finalize();
254  EXPECT_FALSE(handler->getContext());
255  EXPECT_CALL(*handler, detachPipeline(_)).Times(2);
256 }
257 
258 TEST(PipelineTest, NoDetachOnOwner) {
260  EXPECT_CALL(handler, attachPipeline(_));
261  auto pipeline = StaticPipeline<int, int, IntHandler>::create(&handler);
262  pipeline->setOwner(&handler);
263 }
264 
265 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
266 class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
268  public:
269  void read(Context*, Rin /* msg */) override {}
270  Future<Unit> write(Context*, Win /* msg */) override { return makeFuture(); }
271 };
272 
276 
277 TEST(Pipeline, MissingInboundOrOutbound) {
278  auto pipeline = Pipeline<int, int>::create();
279  (*pipeline)
281  .finalize();
282  EXPECT_THROW(pipeline->read(0), std::invalid_argument);
283  EXPECT_THROW(pipeline->readEOF(), std::invalid_argument);
284  EXPECT_THROW(
285  pipeline->readException(exception_wrapper(std::runtime_error("blah"))),
286  std::invalid_argument);
287  EXPECT_THROW(pipeline->write(0), std::invalid_argument);
288  EXPECT_THROW(pipeline->close(), std::invalid_argument);
289 }
290 
291 TEST(Pipeline, DynamicConstruction) {
292  {
294  pipeline->addBack(StringHandler());
295  pipeline->addBack(StringHandler());
296 
297  // Exercise both addFront and addBack. Final pipeline is
298  // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS
300  (*pipeline)
301  .addFront(IntToStringHandler{})
302  .addFront(StringToIntHandler{})
303  .addBack(StringToIntHandler{})
304  .addBack(IntToStringHandler{})
305  .finalize());
306  }
307 }
308 
309 TEST(Pipeline, RemovePointer) {
310  IntHandler handler1, handler2;
311  EXPECT_CALL(handler1, attachPipeline(_));
312  EXPECT_CALL(handler2, attachPipeline(_));
313  auto pipeline = Pipeline<int, int>::create();
314  (*pipeline)
315  .addBack(&handler1)
316  .addBack(&handler2)
317  .finalize();
318 
319  EXPECT_CALL(handler1, detachPipeline(_));
320  (*pipeline)
321  .remove(&handler1)
322  .finalize();
323 
324  EXPECT_CALL(handler2, read_(_, _));
325  pipeline->read(1);
326 
327  EXPECT_CALL(handler2, detachPipeline(_));
328 }
329 
330 TEST(Pipeline, RemoveFront) {
331  IntHandler handler1, handler2;
332  EXPECT_CALL(handler1, attachPipeline(_));
333  EXPECT_CALL(handler2, attachPipeline(_));
334  auto pipeline = Pipeline<int, int>::create();
335  (*pipeline)
336  .addBack(&handler1)
337  .addBack(&handler2)
338  .finalize();
339 
340  EXPECT_CALL(handler1, detachPipeline(_));
341  (*pipeline)
342  .removeFront()
343  .finalize();
344 
345  EXPECT_CALL(handler2, read_(_, _));
346  pipeline->read(1);
347 
348  EXPECT_CALL(handler2, detachPipeline(_));
349 }
350 
351 TEST(Pipeline, RemoveBack) {
352  IntHandler handler1, handler2;
353  EXPECT_CALL(handler1, attachPipeline(_));
354  EXPECT_CALL(handler2, attachPipeline(_));
355  auto pipeline = Pipeline<int, int>::create();
356  (*pipeline)
357  .addBack(&handler1)
358  .addBack(&handler2)
359  .finalize();
360 
361  EXPECT_CALL(handler2, detachPipeline(_));
362  (*pipeline)
363  .removeBack()
364  .finalize();
365 
366  EXPECT_CALL(handler1, read_(_, _));
367  pipeline->read(1);
368 
369  EXPECT_CALL(handler1, detachPipeline(_));
370 }
371 
372 TEST(Pipeline, RemoveType) {
373  IntHandler handler1;
374  IntHandler2 handler2;
375  EXPECT_CALL(handler1, attachPipeline(_));
376  EXPECT_CALL(handler2, attachPipeline(_));
377  auto pipeline = Pipeline<int, int>::create();
378  (*pipeline)
379  .addBack(&handler1)
380  .addBack(&handler2)
381  .finalize();
382 
383  EXPECT_CALL(handler1, detachPipeline(_));
384  (*pipeline)
385  .remove<IntHandler>()
386  .finalize();
387 
388  EXPECT_CALL(handler2, read_(_, _));
389  pipeline->read(1);
390 
391  EXPECT_CALL(handler2, detachPipeline(_));
392 }
393 
394 // Pipeline lifetime used to be managed by DelayedDestruction, which is not
395 // thread safe. This test would fail. Mandatory shared_ptr ownership fixes the
396 // issue.
397 TEST(Pipeline, Concurrent) {
398  NiceMock<MockHandlerAdapter<int, int>> handler1, handler2;
399  auto pipeline = Pipeline<int, int>::create();
400  (*pipeline)
401  .addBack(&handler1)
402  .addBack(&handler2)
403  .finalize();
404  boost::barrier b{2};
405  auto spam = [&]{
406  for (int i = 0; i < 100000; i++) {
407  b.wait();
408  pipeline->read(i);
409  }
410  };
411  std::thread t(spam);
412  spam();
413  t.join();
414 }
415 
416 TEST(PipelineTest, NumHandler) {
417  NiceMock<MockHandlerAdapter<int, int>> handler1, handler2;
418  auto pipeline = Pipeline<int, int>::create();
419  EXPECT_EQ(0, pipeline->numHandlers());
420 
421  pipeline->addBack(&handler1);
422  EXPECT_EQ(1, pipeline->numHandlers());
423 
424  pipeline->addBack(&handler2);
425  EXPECT_EQ(2, pipeline->numHandlers());
426 
427  pipeline->finalize();
428  EXPECT_EQ(2, pipeline->numHandlers());
429 
430  pipeline->remove(&handler1);
431  EXPECT_EQ(1, pipeline->numHandlers());
432 
433  pipeline->remove(&handler2);
434  EXPECT_EQ(0, pipeline->numHandlers());
435 }
436 
437 
438 TEST(PipelineTest, HandlerReuse) {
439  NiceMock<MockHandlerAdapter<int, int>> handler1, handler2, handler3;
440  auto pipeline1 = Pipeline<int, int>::create();
441 
442  // pipeline1 contains the first two handlers
443  (*pipeline1)
444  .addBack(&handler1)
445  .addBack(&handler2)
446  .finalize();
447  pipeline1->read(42);
448  EXPECT_NE(nullptr, handler2.getContext());
449 
450  // Close and detach the back handler (#2)
451  pipeline1->close();
452  pipeline1->removeBack();
453  ASSERT_EQ(nullptr, handler2.getContext());
454 
455  auto pipeline2 = Pipeline<int, int>::create();
456  (*pipeline2)
457  .addBack(&handler2)
458  .addBack(&handler3)
459  .finalize();
460  pipeline2->read(24);
461  EXPECT_NE(nullptr, handler2.getContext());
462 
463  // detach both
464  pipeline2->remove(&handler2);
465  pipeline2->remove(&handler3);
466  ASSERT_EQ(nullptr, handler2.getContext());
467  ASSERT_EQ(nullptr, handler3.getContext());
468 
469  auto pipeline3 = Pipeline<int, int>::create();
470  (*pipeline3)
471  .addBack(&handler2)
472  .addBack(&handler3)
473  .finalize();
474  pipeline3->read(1);
475  EXPECT_NE(nullptr, handler2.getContext());
476 }
ACTION(FireRead)
#define EXPECT_NO_THROW(statement)
Definition: gtest.h:1845
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void handler(int, siginfo_t *, void *)
ConcreteHandler< std::string, int > StringToIntHandler
Handler< Rin, Rout, Win, Wout >::Context Context
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
HandlerAdapter< std::string, std::string > StringHandler
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
Definition: AsyncSocket.h:281
ConcreteHandler< int, std::string > IntToStringHandler
Future< Unit > write(Context *, Win) override
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
TEST(SequencedExecutor, CPUThreadPoolExecutor)
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
StrictMock< MockHandlerAdapter< int, int > > IntHandler
void read(Context *, Rin) override