proxygen
ObservingHandlerTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
19 
20 using namespace wangle;
21 using namespace folly;
22 using namespace testing;
23 
24 class ObservingHandlerTest : public Test {
25  public:
27  public:
28  std::unique_ptr<IOBuf> encode(int& data) override {
29  return IOBuf::copyBuffer(folly::to<std::string>(data));
30  }
31  };
32 
33  void SetUp() override {
34  prevHandler = new StrictMock<MockBytesToBytesHandler>();
35  observingHandler = new StrictMock<MockObservingHandler>(&pool);
36  broadcastHandler = std::make_unique<StrictMock<MockBroadcastHandler>>();
37 
38  pipeline = ObservingPipeline<int>::create();
39  pipeline->addBack(
40  std::shared_ptr<StrictMock<MockBytesToBytesHandler>>(prevHandler));
41  pipeline->addBack(MockIntToByteEncoder());
42  pipeline->addBack(
43  std::shared_ptr<StrictMock<MockObservingHandler>>(observingHandler));
44  pipeline->finalize();
45  }
46 
47  void TearDown() override {
48  Mock::VerifyAndClear(broadcastHandler.get());
49 
50  broadcastHandler.reset();
51  pipeline.reset();
52  }
53 
54  protected:
56 
57  StrictMock<MockBytesToBytesHandler>* prevHandler{nullptr};
58  StrictMock<MockObservingHandler>* observingHandler{nullptr};
59  std::unique_ptr<StrictMock<MockBroadcastHandler>> broadcastHandler;
60 
62 };
63 
66 
67  EXPECT_CALL(*prevHandler, transportActive(_))
68  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
69  ctx->fireTransportActive();
70  }));
71  // Verify that ingress is paused
72  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
73  EXPECT_CALL(pool, mockGetHandler(_))
75  broadcastHandler.get())));
76  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
77  // Verify that ingress is resumed
78  EXPECT_CALL(*prevHandler, transportActive(_))
79  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
80  ctx->fireTransportActive();
81  }));
82 
83  // Initialize the pipeline
84  pipeline->transportActive();
85 
86  EXPECT_CALL(*observingHandler, mockWrite(_, 1))
87  .WillOnce(Return(makeMoveWrapper(makeFuture())));
88  EXPECT_CALL(*observingHandler, mockWrite(_, 2))
89  .WillOnce(Return(makeMoveWrapper(makeFuture())));
90 
91  // Broadcast some data
92  observingHandler->onNext(1);
93  observingHandler->onNext(2);
94 
95  EXPECT_CALL(*observingHandler, mockClose(_))
96  .WillOnce(Return(makeMoveWrapper(makeFuture())));
97 
98  // Finish the broadcast
99  observingHandler->onCompleted();
100 }
101 
102 TEST_F(ObservingHandlerTest, ConnectError) {
103  // Test when an error occurs while fetching the broadcast handler
105 
106  EXPECT_CALL(*prevHandler, transportActive(_))
107  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
108  ctx->fireTransportActive();
109  }));
110  // Verify that ingress is paused
111  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
112  // Inject error
113  EXPECT_CALL(pool, mockGetHandler(_))
115  make_exception_wrapper<std::exception>())));
116  EXPECT_CALL(*observingHandler, mockClose(_))
117  .WillOnce(Return(makeMoveWrapper(makeFuture())));
118 
119  // Initialize the pipeline
120  pipeline->transportActive();
121 }
122 
123 TEST_F(ObservingHandlerTest, ConnectHandlerDeletion) {
124  // Test when the handler goes away before the broadcast handler
125  // is obtained
127 
128  EXPECT_CALL(*prevHandler, transportActive(_))
129  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
130  ctx->fireTransportActive();
131  }));
132  // Verify that ingress is paused
133  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
135  EXPECT_CALL(pool, mockGetHandler(_))
136  .WillOnce(Return(makeMoveWrapper(promise.getFuture())));
137 
138  // Initialize the pipeline
139  pipeline->transportActive();
140 
141  // Delete the handler and then fulfil the promise
142  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(0);
143  pipeline.reset();
144  promise.setValue(broadcastHandler.get());
145 }
146 
147 TEST_F(ObservingHandlerTest, ConnectErrorHandlerDeletion) {
148  // Test when an error occurs while fetching the broadcast handler
149  // after the handler is deleted
151 
152  EXPECT_CALL(*prevHandler, transportActive(_))
153  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
154  ctx->fireTransportActive();
155  }));
156  // Verify that ingress is paused
157  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
159  EXPECT_CALL(pool, mockGetHandler(_))
160  .WillOnce(Return(makeMoveWrapper(promise.getFuture())));
161 
162  // Initialize the pipeline
163  pipeline->transportActive();
164 
165  // Delete the handler and then inject an error
166  pipeline.reset();
167  promise.setException(std::exception());
168 }
169 
170 TEST_F(ObservingHandlerTest, BroadcastError) {
172 
173  EXPECT_CALL(*prevHandler, transportActive(_))
174  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
175  ctx->fireTransportActive();
176  }));
177  // Verify that ingress is paused
178  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
179  EXPECT_CALL(pool, mockGetHandler(_))
181  broadcastHandler.get())));
182  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
183  // Verify that ingress is resumed
184  EXPECT_CALL(*prevHandler, transportActive(_))
185  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
186  ctx->fireTransportActive();
187  }));
188 
189  // Initialize the pipeline
190  pipeline->transportActive();
191 
192  EXPECT_CALL(*observingHandler, mockWrite(_, _))
193  .WillOnce(Return(makeMoveWrapper(makeFuture())));
194 
195  // Broadcast some data
196  observingHandler->onNext(1);
197 
198  EXPECT_CALL(*observingHandler, mockClose(_))
199  .WillOnce(Return(makeMoveWrapper(makeFuture())));
200 
201  // Inject broadcast error
202  observingHandler->onError(make_exception_wrapper<std::exception>());
203 }
204 
207 
208  EXPECT_CALL(*prevHandler, transportActive(_))
209  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
210  ctx->fireTransportActive();
211  }));
212  // Verify that ingress is paused
213  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
214  EXPECT_CALL(pool, mockGetHandler(_))
216  broadcastHandler.get())));
217  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
218  // Verify that ingress is resumed
219  EXPECT_CALL(*prevHandler, transportActive(_))
220  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
221  ctx->fireTransportActive();
222  }));
223 
224  // Initialize the pipeline
225  pipeline->transportActive();
226 
227  EXPECT_CALL(*observingHandler, mockWrite(_, _))
228  .WillOnce(Return(makeMoveWrapper(makeFuture())));
229 
230  // Broadcast some data
231  observingHandler->onNext(1);
232 
233  EXPECT_CALL(*observingHandler, mockClose(_))
234  .WillOnce(InvokeWithoutArgs([&] {
235  // Delete the pipeline
236  pipeline.reset();
237  return makeMoveWrapper(makeFuture());
238  }));
239  EXPECT_CALL(*broadcastHandler, unsubscribe(_)).Times(1);
240 
241  // Client closes connection
242  observingHandler->readEOF(nullptr);
243 }
244 
247 
248  EXPECT_CALL(*prevHandler, transportActive(_))
249  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
250  ctx->fireTransportActive();
251  }));
252  // Verify that ingress is paused
253  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
254  EXPECT_CALL(pool, mockGetHandler(_))
256  broadcastHandler.get())));
257  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
258  // Verify that ingress is resumed
259  EXPECT_CALL(*prevHandler, transportActive(_))
260  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
261  ctx->fireTransportActive();
262  }));
263 
264  // Initialize the pipeline
265  pipeline->transportActive();
266 
267  EXPECT_CALL(*observingHandler, mockWrite(_, _))
268  .WillOnce(Return(makeMoveWrapper(makeFuture())));
269 
270  // Broadcast some data
271  observingHandler->onNext(1);
272 
273  EXPECT_CALL(*observingHandler, mockClose(_))
274  .WillOnce(InvokeWithoutArgs([&] {
275  // Delete the pipeline
276  pipeline.reset();
277  return makeMoveWrapper(makeFuture());
278  }));
279  EXPECT_CALL(*broadcastHandler, unsubscribe(_)).Times(1);
280 
281  // Inject read error
282  observingHandler->readException(nullptr,
283  make_exception_wrapper<std::exception>());
284 }
285 
288 
289  EXPECT_CALL(*prevHandler, transportActive(_))
290  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
291  ctx->fireTransportActive();
292  }));
293  // Verify that ingress is paused
294  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
295  EXPECT_CALL(pool, mockGetHandler(_))
297  broadcastHandler.get())));
298  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
299  // Verify that ingress is resumed
300  EXPECT_CALL(*prevHandler, transportActive(_))
301  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
302  ctx->fireTransportActive();
303  }));
304 
305  // Initialize the pipeline
306  pipeline->transportActive();
307 
308  // Inject write error
309  EXPECT_CALL(*observingHandler, mockWrite(_, _))
310  .WillOnce(Return(
311  MoveWrapper<Future<Unit>>(make_exception_wrapper<std::exception>())));
312  EXPECT_CALL(*observingHandler, mockClose(_))
313  .WillOnce(InvokeWithoutArgs([&] {
314  // Delete the pipeline
315  pipeline.reset();
316  return makeMoveWrapper(makeFuture());
317  }));
318  EXPECT_CALL(*broadcastHandler, unsubscribe(_)).Times(1);
319 
320  // Broadcast some data
321  observingHandler->onNext(1);
322 }
323 
324 TEST_F(ObservingHandlerTest, WriteErrorHandlerDeletion) {
325  // Test when write error occurs asynchronously after the handler
326  // has been deleted.
328 
329  EXPECT_CALL(*prevHandler, transportActive(_))
330  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
331  ctx->fireTransportActive();
332  }));
333  // Verify that ingress is paused
334  EXPECT_CALL(*prevHandler, transportInactive(_)).WillOnce(Return());
335  EXPECT_CALL(pool, mockGetHandler(_))
337  broadcastHandler.get())));
338  EXPECT_CALL(*broadcastHandler, subscribe(_)).Times(1);
339  // Verify that ingress is resumed
340  EXPECT_CALL(*prevHandler, transportActive(_))
341  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
342  ctx->fireTransportActive();
343  }));
344 
345  // Initialize the pipeline
346  pipeline->transportActive();
347 
348  Promise<Unit> promise;
349  EXPECT_CALL(*observingHandler, mockWrite(_, _))
350  .WillOnce(Return(makeMoveWrapper(promise.getFuture())));
351 
352  // Broadcast some data
353  observingHandler->onNext(1);
354 
355  // Delete the pipeline and then fail the write
356  EXPECT_CALL(*broadcastHandler, unsubscribe(_)).Times(1);
357  pipeline.reset();
358  promise.setException(std::exception());
359 }
ObservingPipeline< int >::Ptr pipeline
StrictMock< MockBroadcastPool > pool
void subscribe(uint32_t iters, int N)
Definition: RxBenchmark.cpp:54
std::unique_ptr< IOBuf > encode(int &data) override
void setException(exception_wrapper ew)
Definition: Promise-inl.h:111
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
PolymorphicAction< internal::InvokeWithoutArgsAction< FunctionImpl > > InvokeWithoutArgs(FunctionImpl function_impl)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void dummy()
Future< T > getFuture()
Definition: Promise-inl.h:97
MoveWrapper< T0 > makeMoveWrapper(T &&t)
Definition: MoveWrapper.h:82
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
Definition: Promise.h:326
std::unique_ptr< StrictMock< MockBroadcastHandler > > broadcastHandler
static Ptr create()
Definition: Pipeline.h:174
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
Handler< R, R, W, W >::Context Context
Definition: Handler.h:161
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
internal::ReturnAction< R > Return(R value)
TEST_F(AcceptorTest, TestCanAcceptWithNoConnectionCounter)