proxygen
MockCodecDownstreamTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 #include <folly/io/Cursor.h>
25 #include <sstream>
26 #include <string>
28 #include <vector>
29 #include <boost/optional/optional_io.hpp>
30 #include <fizz/record/Extensions.h>
31 #include <fizz/record/Types.h>
32 
34 
35 using namespace wangle;
36 using namespace fizz;
37 using namespace folly;
38 using namespace proxygen;
39 using namespace std;
40 using namespace testing;
41 
43  { SettingsId::INITIAL_WINDOW_SIZE, 65536 }
44 };
46  { SettingsId::SETTINGS_HTTP_CERT_AUTH, 128}
47 };
49  { SettingsId::SETTINGS_HTTP_CERT_AUTH, 128}
50 };
51 
53  public:
55  : eventBase_(),
58  transactionTimeouts_(makeInternalTimeoutSet(&eventBase_)) {
59 
60  EXPECT_CALL(*transport_, writeChain(_, _, _))
61  .WillRepeatedly(Invoke(this, &MockCodecDownstreamTest::onWriteChain));
62  EXPECT_CALL(*transport_, good())
63  .WillRepeatedly(ReturnPointee(&transportGood_));
64  EXPECT_CALL(*transport_, closeNow())
65  .WillRepeatedly(Assign(&transportGood_, false));
67  .WillRepeatedly(Return(&eventBase_));
68  EXPECT_CALL(*transport_, setReadCB(_))
69  .WillRepeatedly(SaveArg<0>(&transportCb_));
70  EXPECT_CALL(mockController_, getGracefulShutdownTimeout())
71  .WillRepeatedly(Return(std::chrono::milliseconds(0)));
72  EXPECT_CALL(mockController_, attachSession(_));
73  EXPECT_CALL(*codec_, setCallback(_))
74  .WillRepeatedly(SaveArg<0>(&codecCallback_));
75  EXPECT_CALL(*codec_, supportsParallelRequests())
76  .WillRepeatedly(Return(true));
77  EXPECT_CALL(*codec_, supportsPushTransactions())
78  .WillRepeatedly(Return(true));
79  EXPECT_CALL(*codec_, getTransportDirection())
80  .WillRepeatedly(Return(TransportDirection::DOWNSTREAM));
81  EXPECT_CALL(*codec_, getEgressSettings());
82  EXPECT_CALL(*codec_, supportsStreamFlowControl())
83  .WillRepeatedly(Return(true));
84  EXPECT_CALL(*codec_, getProtocol())
85  .WillRepeatedly(Return(CodecProtocol::SPDY_3_1));
86  EXPECT_CALL(*codec_, getUserAgent())
87  .WillRepeatedly(ReturnRef(userAgent_));
88  EXPECT_CALL(*codec_, setParserPaused(_))
89  .WillRepeatedly(Return());
90  EXPECT_CALL(*codec_, supportsSessionFlowControl())
91  .WillRepeatedly(Return(true)); // simulate spdy 3.1
92  EXPECT_CALL(*codec_, getIngressSettings())
93  .WillRepeatedly(Return(&kDefaultIngressSettings));
94  EXPECT_CALL(*codec_, isReusable())
95  .WillRepeatedly(ReturnPointee(&reusable_));
96  EXPECT_CALL(*codec_, isWaitingToDrain())
97  .WillRepeatedly(ReturnPointee(&drainPending_));
98  EXPECT_CALL(*codec_, generateSettings(_));
99  EXPECT_CALL(*codec_, getDefaultWindowSize())
100  .WillRepeatedly(Return(65536));
101  EXPECT_CALL(*codec_, createStream())
102  .WillRepeatedly(InvokeWithoutArgs([&] {
103  return pushStreamID_ += 2;
104  }));
105  EXPECT_CALL(*codec_, enableDoubleGoawayDrain())
106  .WillRepeatedly(Invoke([&] { doubleGoaway_ = true; }));
107  EXPECT_CALL(*codec_, generateGoaway(_, _, _, _))
108  .WillRepeatedly(Invoke([this](IOBufQueue& writeBuf,
109  HTTPCodec::StreamID /*lastStream*/,
110  ErrorCode,
111  std::shared_ptr<folly::IOBuf>) {
112  if (reusable_) {
113  reusable_ = false;
114  drainPending_ = doubleGoaway_;
115  } else if (!drainPending_) {
116  return 0;
117  } else {
118  drainPending_ = false;
119  }
120  if (liveGoaways_) {
121  writeBuf.append(string("x"));
122  }
123  return 1;
124  }));
125  EXPECT_CALL(*codec_, generateRstStream(_, _, _))
126  .WillRepeatedly(Return(1));
127  EXPECT_CALL(*codec_, addPriorityNodes(_, _, _))
128  .WillOnce(Return(0));
129  EXPECT_CALL(*codec_, mapPriorityToDependency(_))
130  .WillRepeatedly(Return(0));
131 
132  HTTPSession::setDefaultReadBufferLimit(65536);
133  httpSession_ = new HTTPDownstreamSession(
134  transactionTimeouts_.get(),
136  &mockController_, std::unique_ptr<HTTPCodec>(codec_),
137  mockTransportInfo, nullptr);
138  httpSession_->startNow();
139  eventBase_.loop();
140  }
141 
143  std::shared_ptr<IOBuf> /*iob*/,
144  WriteFlags) {
145  writeCount_++;
146  if (invokeWriteSuccess_) {
147  callback->writeSuccess();
148  } else {
149  cbs_.push_back(callback);
150  }
151  }
152 
155  for (auto& cb : cbs_) {
156  cb->writeErr(0, ex);
157  }
158  }
159 
160  void SetUp() override {
161  HTTPSession::setDefaultWriteBufferLimit(65536);
162  }
163 
164  // Pass a function to execute inside HTTPCodec::onIngress(). This
165  // function also takes care of passing an empty ingress buffer to the codec.
166  template<class T>
167  void onIngressImpl(T f) {
168  EXPECT_CALL(*codec_, onIngress(_))
169  .WillOnce(Invoke([&f] (const IOBuf& buf) {
170  CHECK_GT(buf.computeChainDataLength(), 0);
171  // The test should be independent of the dummy buffer,
172  // so don't pass it in.
173  f();
174  return buf.computeChainDataLength();
175  }));
176 
177  void* buf;
178  size_t bufSize;
179  transportCb_->getReadBuffer(&buf, &bufSize);
180  transportCb_->readDataAvailable(bufSize);
181  }
182 
183  void testGoaway(bool doubleGoaway, bool dropConnection);
184 
185  void testConnFlowControlBlocked(bool timeout);
186 
187  protected:
188 
190  // invalid once httpSession_ is destroyed
192  std::string userAgent_{"MockCodec"};
193  HTTPCodec::Callback* codecCallback_{nullptr};
199  HTTPCodec::StreamID pushStreamID_{0};
200  bool reusable_{true};
201  bool transportGood_{true};
202  bool drainPending_{false};
203  bool doubleGoaway_{false};
204  bool liveGoaways_{false};
205  bool invokeWriteSuccess_{false};
206  uint32_t writeCount_{0};
207  std::vector<folly::AsyncTransportWrapper::WriteCallback*> cbs_;
208 };
209 
210 TEST_F(MockCodecDownstreamTest, OnAbortThenTimeouts) {
211  // Test what happens when txn1 (out of many transactions) gets an abort
212  // followed by a transaction timeout followed by a write timeout
213  MockHTTPHandler handler1;
214  MockHTTPHandler handler2;
215  auto req1 = makeGetRequest();
216  auto req2 = makeGetRequest();
217 
219 
220  EXPECT_CALL(mockController_, getRequestHandler(_, _))
221  .WillOnce(Return(&handler1))
222  .WillOnce(Return(&handler2));
223 
224  EXPECT_CALL(handler1, setTransaction(_))
225  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
226  handler1.txn_ = txn; }));
227  EXPECT_CALL(handler1, onHeadersComplete(_))
228  .WillOnce(Invoke([&handler1] (std::shared_ptr<HTTPMessage>) {
229  handler1.sendHeaders(200, 100);
230  handler1.sendBody(100);
231  }));
232  EXPECT_CALL(handler1, onError(_));
233  EXPECT_CALL(handler1, detachTransaction());
234  EXPECT_CALL(handler2, setTransaction(_))
235  .WillOnce(Invoke([&handler2] (HTTPTransaction* txn) {
236  handler2.txn_ = txn; }));
237  EXPECT_CALL(handler2, onHeadersComplete(_))
238  .WillOnce(Invoke([&handler2] (std::shared_ptr<HTTPMessage>) {
239  handler2.sendHeaders(200, 100);
240  handler2.sendBody(100);
241  }));
242  EXPECT_CALL(handler2, onBody(_));
243  EXPECT_CALL(handler2, onError(_))
244  .WillOnce(Invoke([&] (const HTTPException& ex) {
246  ASSERT_EQ(
247  folly::to<std::string>("WriteTimeout on transaction id: ",
248  handler2.txn_->getID()),
249  std::string(ex.what()));
250  }));
251  EXPECT_CALL(handler2, detachTransaction());
252 
253  EXPECT_CALL(mockController_, detachSession(_));
254 
255  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
256  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
257  codecCallback_->onMessageBegin(HTTPCodec::StreamID(3), req2.get());
258  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(3), std::move(req2));
259  // do the write, enqeue byte event
260  eventBase_.loop();
261 
262  // recv an abort, detach the handler from txn1 (txn1 stays around due to the
263  // enqueued byte event)
264  codecCallback_->onAbort(HTTPCodec::StreamID(1), ErrorCode::PROTOCOL_ERROR);
265  // recv a transaction timeout on txn1 (used to erroneously create a direct
266  // response handler)
267  handler1.txn_->timeoutExpired();
268 
269  // have a write timeout expire (used to cause the direct response handler to
270  // write out data, messing up the state machine)
271  eventBase_.runAfterDelay(
272  [this] {
273  // refresh ingress timeout
274  codecCallback_->onBody(HTTPCodec::StreamID(3), makeBuf(10), 0);
275  }, transactionTimeouts_->getDefaultTimeout().count() / 2);
276  // hold evb open long enough to fire write timeout (since transactionTimeouts_
277  // is internal
278  eventBase_.runAfterDelay(
279  [] {}, transactionTimeouts_->getDefaultTimeout().count() + 100);
280  eventBase_.loop();
281 }
282 
285  MockHTTPPushHandler pushHandler;
286  auto req = makeGetRequest();
287  HTTPTransaction* pushTxn = nullptr;
288 
289  InSequence enforceOrder;
290 
291  EXPECT_CALL(mockController_, getRequestHandler(_, _))
292  .WillOnce(Return(&handler));
293  EXPECT_CALL(handler, setTransaction(_))
294  .WillOnce(SaveArg<0>(&handler.txn_));
295 
296  EXPECT_CALL(handler, onHeadersComplete(_))
297  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage>) {
298  pushTxn = handler.txn_->newPushedTransaction(&pushHandler);
299  pushHandler.sendPushHeaders("/foo", "www.foo.com", 100,
300  handler.txn_->getPriority());
301  pushHandler.sendBody(100);
302  pushTxn->sendEOM();
303  eventBase_.loop(); // flush the push txn's body
304  }));
305  EXPECT_CALL(pushHandler, setTransaction(_))
306  .WillOnce(Invoke([&pushHandler] (HTTPTransaction* txn) {
307  pushHandler.txn_ = txn; }));
308 
309  EXPECT_CALL(*codec_, generatePushPromise(_, 2, _, _, _, _));
310  EXPECT_CALL(*codec_, generateBody(_, 2, PtrBufHasLen(uint64_t(100)),
311  _, true));
312  EXPECT_CALL(pushHandler, detachTransaction());
313 
314  EXPECT_CALL(handler, onEOM())
315  .WillOnce(Invoke([&] {
316  handler.sendReplyWithBody(200, 100);
317  eventBase_.loop(); // flush the response to the normal request
318  }));
319 
320  EXPECT_CALL(*codec_, generateHeader(_, 1, _, _, _));
321  EXPECT_CALL(*codec_, generateBody(_, 1, PtrBufHasLen(uint64_t(100)),
322  _, true));
323  EXPECT_CALL(handler, detachTransaction());
324 
325  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req.get());
326  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req));
327  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
328 
329  EXPECT_CALL(*codec_, onIngressEOF());
330  EXPECT_CALL(mockController_, detachSession(_));
331  httpSession_->dropConnection();
332 }
333 
334 TEST_F(MockCodecDownstreamTest, ServerPushAfterGoaway) {
335  // Tests if goaway
336  // - drains acknowledged server push transactions
337  // - aborts server pushed transactions not created at the client
338  // - prevents new transactions from being created.
340  MockHTTPPushHandler pushHandler1;
341  MockHTTPPushHandler pushHandler2;
342  HTTPTransaction* pushTxn = nullptr;
343 
345 
346  EXPECT_CALL(mockController_, getRequestHandler(_, _))
347  .WillOnce(Return(&handler));
348 
349  EXPECT_CALL(handler, setTransaction(_))
350  .WillOnce(Invoke([&handler] (HTTPTransaction* txn) {
351  handler.txn_ = txn; }));
352  EXPECT_CALL(handler, onHeadersComplete(_))
353  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage>) {
354  // Initiate server push transactions.
355  pushTxn = handler.txn_->newPushedTransaction(&pushHandler1);
356  CHECK_EQ(pushTxn->getID(), HTTPCodec::StreamID(2));
357  pushHandler1.sendPushHeaders("/foo", "www.foo.com", 100,
358  handler.txn_->getPriority());
359  pushHandler1.sendBody(100);
360  pushTxn->sendEOM();
361  // Initiate the second push transaction which will be aborted
362  pushTxn = handler.txn_->newPushedTransaction(&pushHandler2);
363  CHECK_EQ(pushTxn->getID(), HTTPCodec::StreamID(4));
364  pushHandler2.sendPushHeaders("/foo", "www.foo.com", 100,
365  handler.txn_->getPriority());
366  pushHandler2.sendBody(100);
367  pushTxn->sendEOM();
368  }));
369  // Push transaction 1 - drained
370  EXPECT_CALL(pushHandler1, setTransaction(_))
371  .WillOnce(Invoke([&pushHandler1] (HTTPTransaction* txn) {
372  pushHandler1.txn_ = txn; }));
373  EXPECT_CALL(pushHandler1, detachTransaction());
374  // Push transaction 2 - aborted by onError after goaway
375  EXPECT_CALL(pushHandler2, setTransaction(_))
376  .WillOnce(Invoke([&pushHandler2] (HTTPTransaction* txn) {
377  pushHandler2.txn_ = txn; }));
378  EXPECT_CALL(pushHandler2, onError(_))
379  .WillOnce(Invoke([&] (const HTTPException& err) {
382  ASSERT_EQ(
383  folly::to<std::string>("StreamUnacknowledged on transaction id: ",
384  pushHandler2.txn_->getID()),
385  std::string(err.what()));
386  }));
387  EXPECT_CALL(pushHandler2, detachTransaction());
388 
389  EXPECT_CALL(handler, onEOM());
390  EXPECT_CALL(handler, detachTransaction());
391 
392  // Receive client request
393  auto req = makeGetRequest();
394  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req.get());
395  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req));
396  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
397 
398  // Receive goaway acknowledging only the first pushed transactions with id 2.
399  codecCallback_->onGoaway(2, ErrorCode::NO_ERROR);
400 
401  // New server pushed transaction cannot be created after goaway
402  MockHTTPPushHandler pushHandler3;
403  EXPECT_EQ(handler.txn_->newPushedTransaction(&pushHandler3), nullptr);
404 
405  // Send response to the initial client request and this destroys the session
406  handler.sendReplyWithBody(200, 100);
407 
408  eventBase_.loop();
409 
410  EXPECT_CALL(mockController_, detachSession(_));
411  httpSession_->dropConnection();
412 }
413 
414 TEST_F(MockCodecDownstreamTest, ServerPushAbort) {
415  // Test that assoc txn and other push txns are not affected when client aborts
416  // a push txn
418  MockHTTPPushHandler pushHandler1;
419  MockHTTPPushHandler pushHandler2;
420  HTTPTransaction* pushTxn1 = nullptr;
421  HTTPTransaction* pushTxn2 = nullptr;
422 
424 
425  EXPECT_CALL(mockController_, getRequestHandler(_, _))
426  .WillOnce(Return(&handler));
427 
428  EXPECT_CALL(handler, setTransaction(_))
429  .WillOnce(Invoke([&handler] (HTTPTransaction* txn) {
430  handler.txn_ = txn; }));
431  EXPECT_CALL(handler, onHeadersComplete(_))
432  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage>) {
433  // Initiate server push transactions
434  pushTxn1 = handler.txn_->newPushedTransaction(&pushHandler1);
435  CHECK_EQ(pushTxn1->getID(), HTTPCodec::StreamID(2));
436  pushHandler1.sendPushHeaders("/foo", "www.foo.com", 100,
437  handler.txn_->getPriority());
438  pushHandler1.sendBody(100);
439 
440  pushTxn2 = handler.txn_->newPushedTransaction(&pushHandler2);
441  CHECK_EQ(pushTxn2->getID(), HTTPCodec::StreamID(4));
442  pushHandler2.sendPushHeaders("/bar", "www.bar.com", 200,
443  handler.txn_->getPriority());
444  pushHandler2.sendBody(200);
445  pushTxn2->sendEOM();
446  }));
447 
448  // pushTxn1 should be aborted
449  EXPECT_CALL(pushHandler1, setTransaction(_))
450  .WillOnce(Invoke([&pushHandler1] (HTTPTransaction* txn) {
451  pushHandler1.txn_ = txn; }));
452  EXPECT_CALL(pushHandler1, onError(_))
453  .WillOnce(Invoke([&] (const HTTPException& err) {
456  ASSERT_EQ(
457  "Stream aborted, streamID=2, code=CANCEL",
458  std::string(err.what()));
459  }));
460  EXPECT_CALL(pushHandler1, detachTransaction());
461 
462  EXPECT_CALL(pushHandler2, setTransaction(_))
463  .WillOnce(Invoke([&pushHandler2] (HTTPTransaction* txn) {
464  pushHandler2.txn_ = txn; }));
465  EXPECT_CALL(pushHandler2, detachTransaction());
466 
467  EXPECT_CALL(handler, onEOM());
468  EXPECT_CALL(handler, detachTransaction());
469 
470  // Receive client request
471  auto req = makeGetRequest();
472  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req.get());
473  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req));
474  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
475 
476  // Send client abort on one push txn
477  codecCallback_->onAbort(HTTPCodec::StreamID(2), ErrorCode::CANCEL);
478 
479  handler.sendReplyWithBody(200, 100);
480 
481  eventBase_.loop();
482 
483  EXPECT_CALL(mockController_, detachSession(_));
484  httpSession_->dropConnection();
485 }
486 
487 TEST_F(MockCodecDownstreamTest, ServerPushAbortAssoc) {
488  // Test that all associated push transactions are aborted when client aborts
489  // the assoc stream
491  MockHTTPPushHandler pushHandler1;
492  MockHTTPPushHandler pushHandler2;
493 
495 
496  EXPECT_CALL(mockController_, getRequestHandler(_, _))
497  .WillOnce(Return(&handler));
498 
499  EXPECT_CALL(handler, setTransaction(_))
500  .WillOnce(Invoke([&handler] (HTTPTransaction* txn) {
501  handler.txn_ = txn; }));
502  EXPECT_CALL(handler, onHeadersComplete(_))
503  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage>) {
504  // Initiate server push transactions
505  auto pushTxn = handler.txn_->newPushedTransaction(&pushHandler1);
506  CHECK_EQ(pushTxn->getID(), HTTPCodec::StreamID(2));
507  pushHandler1.sendPushHeaders("/foo", "www.foo.com", 100,
508  handler.txn_->getPriority());
509  pushHandler1.sendBody(100);
510  eventBase_.loop();
511 
512  pushTxn = handler.txn_->newPushedTransaction(&pushHandler2);
513  CHECK_EQ(pushTxn->getID(), HTTPCodec::StreamID(4));
514  pushHandler2.sendPushHeaders("/foo", "www.foo.com", 100,
515  handler.txn_->getPriority());
516  pushHandler2.sendBody(100);
517  eventBase_.loop();
518  }));
519 
520  // Both push txns and the assoc txn should be aborted
521  EXPECT_CALL(pushHandler1, setTransaction(_))
522  .WillOnce(Invoke([&pushHandler1] (HTTPTransaction* txn) {
523  pushHandler1.txn_ = txn; }));
524  EXPECT_CALL(pushHandler1, onError(_))
525  .WillOnce(Invoke([&] (const HTTPException& err) {
528  ASSERT_EQ(
529  "Stream aborted, streamID=1, code=CANCEL",
530  std::string(err.what()));
531  }));
532  EXPECT_CALL(pushHandler1, detachTransaction());
533 
534  EXPECT_CALL(pushHandler2, setTransaction(_))
535  .WillOnce(Invoke([&pushHandler2] (HTTPTransaction* txn) {
536  pushHandler2.txn_ = txn; }));
537  EXPECT_CALL(pushHandler2, onError(_))
538  .WillOnce(Invoke([&] (const HTTPException& err) {
541  ASSERT_EQ(
542  "Stream aborted, streamID=1, code=CANCEL",
543  std::string(err.what()));
544  }));
545  EXPECT_CALL(pushHandler2, detachTransaction());
546 
547  EXPECT_CALL(handler, onError(_))
548  .WillOnce(Invoke([&] (const HTTPException& err) {
551  ASSERT_EQ(
552  "Stream aborted, streamID=1, code=CANCEL",
553  std::string(err.what()));
554  }));
555  EXPECT_CALL(handler, detachTransaction());
556 
557  // Receive client request
558  auto req = makeGetRequest();
559  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req.get());
560  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req));
561 
562  // Send client abort on assoc stream
563  codecCallback_->onAbort(HTTPCodec::StreamID(1), ErrorCode::CANCEL);
564 
565  eventBase_.loop();
566 
567  EXPECT_CALL(mockController_, detachSession(_));
568  httpSession_->dropConnection();
569 }
570 
571 TEST_F(MockCodecDownstreamTest, ServerPushClientMessage) {
572  // Test that error is generated when client sends data on a pushed stream
574  MockHTTPPushHandler pushHandler;
575  auto req = makeGetRequest();
576  HTTPTransaction* pushTxn = nullptr;
577 
578  InSequence enforceOrder;
579 
580  EXPECT_CALL(mockController_, getRequestHandler(_, _))
581  .WillOnce(Return(&handler));
582  EXPECT_CALL(handler, setTransaction(_))
583  .WillOnce(SaveArg<0>(&handler.txn_));
584 
585  EXPECT_CALL(handler, onHeadersComplete(_))
586  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage> msg) {
587  pushTxn = handler.txn_->newPushedTransaction(&pushHandler);
588  auto pri = handler.txn_->getPriority();
589  msg->setHTTP2Priority(std::make_tuple(pri.streamDependency,
590  pri.exclusive, pri.weight));
591  }));
592  EXPECT_CALL(pushHandler, setTransaction(_))
593  .WillOnce(Invoke([&pushHandler] (HTTPTransaction* txn) {
594  pushHandler.txn_ = txn; }));
595 
596  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req.get());
597  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req));
598 
599  EXPECT_CALL(*codec_, generateRstStream(_, 2, ErrorCode::STREAM_CLOSED))
600  .WillRepeatedly(Return(1));
601  EXPECT_CALL(pushHandler, onError(_))
602  .WillOnce(Invoke([&] (const HTTPException& ex) {
604  EXPECT_EQ(ex.getCodecStatusCode(), ErrorCode::STREAM_CLOSED);
605  ASSERT_EQ(
606  "Downstream attempts to send ingress, abort.",
607  std::string(ex.what()));
608  }));
609  EXPECT_CALL(pushHandler, detachTransaction());
610 
611  // While the assoc stream is open and pushHandler has been initialized, send
612  // an upstream message on the push stream causing a RST_STREAM.
613  req = makeGetRequest();
614  codecCallback_->onMessageBegin(HTTPCodec::StreamID(2), req.get());
615 
616  EXPECT_CALL(handler, onEOM())
617  .WillOnce(InvokeWithoutArgs([&] {
618  handler.sendReplyWithBody(200, 100);
619  eventBase_.loop(); // flush the response to the assoc request
620  }));
621  EXPECT_CALL(*codec_, generateHeader(_, 1, _, _, _));
622  EXPECT_CALL(*codec_, generateBody(_, 1, PtrBufHasLen(uint64_t(100)),
623  _, true));
624  EXPECT_CALL(handler, detachTransaction());
625 
626  // Complete the assoc request/response
627  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
628 
629  eventBase_.loop();
630 
631  EXPECT_CALL(*codec_, onIngressEOF());
632  EXPECT_CALL(mockController_, detachSession(_));
633  httpSession_->dropConnection();
634 }
635 
637  // Test read timeout path
638  MockHTTPHandler handler1;
639  auto req1 = makeGetRequest();
640 
642  EXPECT_CALL(*codec_, onIngressEOF())
643  .WillRepeatedly(Return());
644 
645  EXPECT_CALL(mockController_, getRequestHandler(_, _))
646  .WillOnce(Return(&handler1));
647 
648  EXPECT_CALL(handler1, setTransaction(_))
649  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
650  handler1.txn_ = txn; }));
651  EXPECT_CALL(handler1, onHeadersComplete(_));
652 
653  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
654  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
655  // force the read timeout to expire, should be a no-op because the txn is
656  // still expecting EOM and has its own timer.
657  httpSession_->timeoutExpired();
658  EXPECT_EQ(httpSession_->getConnectionCloseReason(),
659  ConnectionCloseReason::kMAX_REASON);
660 
661  EXPECT_CALL(handler1, onEOM())
662  .WillOnce(Invoke([&handler1] () {
663  handler1.txn_->pauseIngress();
664  }));
665 
666  // send the EOM, then another timeout. Still no-op since it's waiting
667  // upstream
668  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
669  httpSession_->timeoutExpired();
670  EXPECT_EQ(httpSession_->getConnectionCloseReason(),
671  ConnectionCloseReason::kMAX_REASON);
672 
673  EXPECT_CALL(*transport_, writeChain(_, _, _))
674  .WillRepeatedly(Invoke([] (
676  std::shared_ptr<folly::IOBuf>,
678  callback->writeSuccess();
679  }));
680 
681  EXPECT_CALL(handler1, detachTransaction());
682 
683  // Send the response, timeout. Now it's idle and should close.
684  handler1.txn_->resumeIngress();
685  handler1.sendReplyWithBody(200, 100);
686  eventBase_.loop();
687 
688  httpSession_->timeoutExpired();
689  EXPECT_EQ(httpSession_->getConnectionCloseReason(),
691 
692  // tear down the test
693  EXPECT_CALL(mockController_, detachSession(_));
694  httpSession_->dropConnection();
695 }
696 
698  // Test ping mechanism and that we prioritize the ping reply
699  MockHTTPHandler handler1;
700  auto req1 = makeGetRequest();
701 
702  InSequence enforceOrder;
703 
704  EXPECT_CALL(mockController_, getRequestHandler(_, _))
705  .WillOnce(Return(&handler1));
706 
707  EXPECT_CALL(handler1, setTransaction(_))
708  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
709  handler1.txn_ = txn; }));
710  EXPECT_CALL(handler1, onHeadersComplete(_));
711  EXPECT_CALL(handler1, onEOM())
712  .WillOnce(InvokeWithoutArgs([&handler1] () {
713  handler1.sendReplyWithBody(200, 100);
714  }));
715 
716  // Header egresses immediately
717  EXPECT_CALL(*codec_, generateHeader(_, _, _, _, _));
718  // Ping jumps ahead of queued body in the loop callback
719  EXPECT_CALL(*codec_, generatePingReply(_, _));
720  EXPECT_CALL(*codec_, generateBody(_, _, _, _, true));
721  EXPECT_CALL(handler1, detachTransaction());
722 
723  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
724  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
725  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
726  codecCallback_->onPingRequest(1);
727 
728  eventBase_.loop();
729 
730  EXPECT_CALL(*codec_, onIngressEOF());
731  EXPECT_CALL(mockController_, detachSession(_));
732  httpSession_->dropConnection();
733 }
734 
735 TEST_F(MockCodecDownstreamTest, FlowControlAbort) {
736  MockHTTPHandler handler1;
737  auto req1 = makePostRequest();
738 
739  InSequence enforceOrder;
740 
741  EXPECT_CALL(mockController_, getRequestHandler(_, _))
742  .WillOnce(Return(&handler1));
743 
744  EXPECT_CALL(handler1, setTransaction(_))
745  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
746  handler1.txn_ = txn; }));
747  EXPECT_CALL(handler1, onHeadersComplete(_))
748  .WillOnce(InvokeWithoutArgs([&handler1] () {
749  handler1.txn_->sendAbort();
750  }));
751 
752  // Header egresses immediately
753  EXPECT_CALL(handler1, detachTransaction());
754 
755  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
756  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
757  EXPECT_CALL(*codec_, generateWindowUpdate(_, 0, spdy::kInitialWindow));
758  codecCallback_->onBody(HTTPCodec::StreamID(1),
760  EXPECT_CALL(*codec_, generateWindowUpdate(_, 0, spdy::kInitialWindow));
761  codecCallback_->onBody(HTTPCodec::StreamID(1),
763 
764  eventBase_.loop();
765 
766  EXPECT_CALL(*codec_, onIngressEOF());
767  EXPECT_CALL(mockController_, detachSession(_));
768  httpSession_->dropConnection();
769 }
770 
773  auto req1 = makePostRequest(20);
774  auto chunk = makeBuf(10);
775  auto chunkStr = chunk->clone()->moveToFbString();
776 
778 
779  httpSession_->setDefaultReadBufferLimit(10);
780 
781  EXPECT_CALL(mockController_, getRequestHandler(_, _))
782  .WillOnce(Return(&handler));
783 
784  EXPECT_CALL(handler, setTransaction(_))
785  .WillOnce(Invoke([&handler] (HTTPTransaction* txn) {
786  handler.txn_ = txn; }));
787  EXPECT_CALL(handler, onHeadersComplete(_))
788  .WillOnce(InvokeWithoutArgs([&handler] () {
789  handler.txn_->pauseIngress();
790  }));
791 
792  EXPECT_CALL(*transport_, writeChain(_, _, _))
793  .WillRepeatedly(Invoke([&] (
795  const shared_ptr<IOBuf>&,
796  WriteFlags) {
797  callback->writeSuccess();
798  }));
799 
800  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
801  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
802  for (int i = 0; i < 2; i++) {
803  codecCallback_->onBody(HTTPCodec::StreamID(1), chunk->clone(), 0);
804  }
805  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
806 
807  EXPECT_CALL(handler, onBody(_))
808  .WillOnce(ExpectString(chunkStr))
809  .WillOnce(ExpectString(chunkStr));
810 
811  EXPECT_CALL(handler, onEOM());
812 
813  EXPECT_CALL(handler, detachTransaction());
814 
815  eventBase_.tryRunAfterDelay([&handler, this] {
816  handler.txn_->resumeIngress();
817  handler.sendReplyWithBody(200, 100);
818  eventBase_.runInLoop([this] { httpSession_->dropConnection(); });
819  }, 30);
820 
821  EXPECT_CALL(*codec_, onIngressEOF());
822  EXPECT_CALL(mockController_, detachSession(_));
823  eventBase_.loop();
824 }
825 
827  // Test window updates
828  MockHTTPHandler handler1;
829  auto req1 = makeGetRequest();
830 
832 
833  {
834  InSequence enforceOrder;
835  EXPECT_CALL(mockController_, getRequestHandler(_, _))
836  .WillOnce(Return(&handler1));
837 
838  EXPECT_CALL(handler1, setTransaction(_))
839  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
840  handler1.txn_ = txn; }));
841  EXPECT_CALL(handler1, onHeadersComplete(_))
842  .WillOnce(InvokeWithoutArgs([this] () {
843  codecCallback_->onSettings(
844  {{SettingsId::INITIAL_WINDOW_SIZE, 4000}});
845  }));
846  EXPECT_CALL(*codec_, generateSettingsAck(_));
847  EXPECT_CALL(handler1, onEOM())
848  .WillOnce(InvokeWithoutArgs([&handler1] () {
849  handler1.sendHeaders(200, 16000);
850  handler1.sendBody(12000);
851  // 12kb buffered -> pause upstream
852  }));
853  EXPECT_CALL(handler1, onEgressPaused())
854  .WillOnce(InvokeWithoutArgs([this] () {
855  eventBase_.runInLoop([this] {
856  // triggers 4k send, 8kb buffered, handler still paused
857  codecCallback_->onWindowUpdate(1, 4000);
858  });
859  eventBase_.runAfterDelay([this] {
860  // triggers 6k send, 2kb buffered, handler still paused
861  codecCallback_->onWindowUpdate(1, 6000);
862  }, 10);
863  eventBase_.runAfterDelay([this] {
864  // triggers 2kb send, 0 buffered, 2k window => resume
865  codecCallback_->onWindowUpdate(1, 4000);
866  }, 20);
867  }));
868  EXPECT_CALL(handler1, onEgressResumed())
869  .WillOnce(InvokeWithoutArgs([&handler1] () {
870  handler1.sendBody(4000);
871  // 2kb send, 2kb buffered => pause upstream
872  }));
873  EXPECT_CALL(handler1, onEgressPaused())
874  .WillOnce(InvokeWithoutArgs([this] () {
875  eventBase_.runInLoop([this] {
876  // triggers 2kb send, resume
877  codecCallback_->onWindowUpdate(1, 4000);
878  });
879  }));
880  EXPECT_CALL(handler1, onEgressResumed())
881  .WillOnce(InvokeWithoutArgs([&handler1] () {
882  handler1.txn_->sendEOM();
883  }));
884 
885  EXPECT_CALL(handler1, detachTransaction());
886 
887  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
888  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
889  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
890  // Pad coverage numbers
891  std::ostringstream stream;
892  stream << *handler1.txn_ << *httpSession_
893  << httpSession_->getLocalAddress()
894  << httpSession_->getPeerAddress();
895  EXPECT_TRUE(httpSession_->isBusy());
896 
897  EXPECT_CALL(*codec_, onIngressEOF());
898  EXPECT_CALL(mockController_, detachSession(_));
899  }
900 
901  EXPECT_CALL(*transport_, writeChain(_, _, _))
902  .WillRepeatedly(Invoke([] (
904  std::shared_ptr<folly::IOBuf>,
906  callback->writeSuccess();
907  }));
908  eventBase_.loop();
909  httpSession_->dropConnection();
910 }
911 
913  // Test spdy ping mechanism and egress re-ordering
914  MockHTTPHandler handler1;
915  auto req1 = makePostRequest(5);
916  auto buf = makeBuf(5);
917  auto bufStr = buf->clone()->moveToFbString();
918 
920 
921  EXPECT_CALL(mockController_, getRequestHandler(_, _))
922  .WillOnce(Return(&handler1));
923 
924  EXPECT_CALL(handler1, setTransaction(_))
925  .WillOnce(Invoke([&handler1] (HTTPTransaction* txn) {
926  handler1.txn_ = txn; }));
927  EXPECT_CALL(handler1, onHeadersComplete(_))
928  .WillOnce(InvokeWithoutArgs([&handler1, this] {
929  handler1.txn_->pauseIngress();
930  eventBase_.tryRunAfterDelay([&handler1] {
931  handler1.txn_->resumeIngress();
932  }, 50);
933  }));
934  EXPECT_CALL(handler1, onBody(_))
935  .WillOnce(Invoke([&handler1, &bufStr] (
936  std::shared_ptr<folly::IOBuf> chain) {
937  EXPECT_EQ(bufStr, chain->moveToFbString());
938  handler1.txn_->pauseIngress();
939  handler1.txn_->resumeIngress();
940  }));
941 
942  EXPECT_CALL(handler1, onEOM())
943  .WillOnce(InvokeWithoutArgs([&handler1] () {
944  handler1.sendReplyWithBody(200, 100, false);
945  }));
946  EXPECT_CALL(handler1, detachTransaction());
947 
948  codecCallback_->onMessageBegin(HTTPCodec::StreamID(1), req1.get());
949  codecCallback_->onHeadersComplete(HTTPCodec::StreamID(1), std::move(req1));
950  codecCallback_->onBody(HTTPCodec::StreamID(1), std::move(buf), 0);
951  codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);
952 
953  EXPECT_CALL(*codec_, onIngressEOF());
954  EXPECT_CALL(mockController_, detachSession(_));
955 
956  EXPECT_CALL(*transport_, writeChain(_, _, _))
957  .WillRepeatedly(Invoke([] (
959  std::shared_ptr<folly::IOBuf>,
961  callback->writeSuccess();
962  }));
963 
964  eventBase_.loop();
965  httpSession_->dropConnection();
966 }
967 
969  // Let the connection level flow control window fill and then make sure
970  // control frames still can be processed
971  InSequence enforceOrder;
972  NiceMock<MockHTTPHandler> handler1;
973  NiceMock<MockHTTPHandler> handler2;
974  auto wantToWrite = spdy::kInitialWindow + 50000;
975  auto wantToWriteStr = folly::to<string>(wantToWrite);
976  auto req1 = makeGetRequest();
977  auto req2 = makeGetRequest();
978  auto resp1 = makeResponse(200);
979  resp1->getHeaders().set(HTTP_HEADER_CONTENT_LENGTH, wantToWriteStr);
980  auto resp2 = makeResponse(200);
981  resp2->getHeaders().set(HTTP_HEADER_CONTENT_LENGTH, wantToWriteStr);
982  invokeWriteSuccess_ = true;
983 
984  EXPECT_CALL(mockController_, getRequestHandler(_, _))
985  .WillOnce(Return(&handler1));
986  EXPECT_CALL(handler1, setTransaction(_))
987  .WillOnce(SaveArg<0>(&handler1.txn_));
988  EXPECT_CALL(handler1, onHeadersComplete(_));
989  EXPECT_CALL(*codec_, generateHeader(_, 1, _, _, _))
990  .WillOnce(Invoke([] (folly::IOBufQueue& writeBuf,
992  const HTTPMessage&,
993  bool,
994  HTTPHeaderSize*) {
995  writeBuf.append("", 1);
996  }));
997  unsigned bodyLen = 0;
998  EXPECT_CALL(*codec_, generateBody(_, 1, _, _, false))
999  .WillRepeatedly(Invoke([&](folly::IOBufQueue& /*writeBuf*/,
1001  std::shared_ptr<folly::IOBuf> chain,
1003  bool /*eom*/) {
1004  bodyLen += chain->computeChainDataLength();
1005  return 0; // don't want byte events
1006  }));
1007 
1008  codecCallback_->onMessageBegin(1, req1.get());
1009  codecCallback_->onHeadersComplete(1, std::move(req1));
1010  codecCallback_->onWindowUpdate(1, wantToWrite); // ensure the per-stream
1011  // window doesn't block
1012  handler1.txn_->sendHeaders(*resp1);
1013  handler1.txn_->sendBody(makeBuf(wantToWrite)); // conn blocked, stream open
1014  handler1.txn_->sendEOM();
1015  eventBase_.loop(); // actually send (most of) the body
1016  CHECK_EQ(bodyLen, spdy::kInitialWindow); // should have written a full window
1017 
1018  EXPECT_CALL(mockController_, getRequestHandler(_, _))
1019  .WillOnce(Return(&handler2));
1020  EXPECT_CALL(handler2, setTransaction(_))
1021  .WillOnce(SaveArg<0>(&handler2.txn_));
1022  EXPECT_CALL(handler2, onHeadersComplete(_));
1023  EXPECT_CALL(*codec_, generateHeader(_, 3, _, _, _))
1024  .WillOnce(Invoke([] (folly::IOBufQueue& writeBuf,
1026  const HTTPMessage&,
1027  bool,
1028  HTTPHeaderSize*) {
1029  writeBuf.append("", 1);
1030  }));
1031 
1032  auto writeCount = writeCount_;
1033 
1034  // Make sure we can send headers of response to a second request
1035  codecCallback_->onMessageBegin(3, req2.get());
1036  codecCallback_->onHeadersComplete(3, std::move(req2));
1037  handler2.txn_->sendHeaders(*resp2);
1038 
1039  eventBase_.loop();
1040 
1041  EXPECT_EQ(writeCount + 1, writeCount_);
1042 
1043  if (timeout) {
1044  // don't send a window update, the handlers will get timeouts
1045  EXPECT_CALL(handler1, onError(_))
1046  .WillOnce(Invoke([] (const HTTPException& ex) {
1048  }));
1049  EXPECT_CALL(handler2, onError(_))
1050  .WillOnce(Invoke([] (const HTTPException& ex) {
1052  }));
1053  EXPECT_CALL(mockController_, detachSession(_));
1054  // send a window update to refresh the stream level timeout
1055  codecCallback_->onWindowUpdate(1, 1);
1056  // silly, the timeout set is internal and there's no fd, so hold the
1057  // eventBase open until the timeout can fire
1058  eventBase_.runAfterDelay([] {}, 500);
1059 
1060  transactionTimeouts_->cancelAll();
1061  } else {
1062  // Give a connection level window update of 10 bytes -- this
1063  // should allow 10 bytes of the txn1 response to be written
1064  codecCallback_->onWindowUpdate(0, 10);
1065  EXPECT_CALL(*codec_, generateBody(_, 1, PtrBufHasLen(uint64_t(10)),
1066  _, false));
1067  eventBase_.loop();
1068 
1069  // Just tear everything down now.
1070  EXPECT_CALL(handler1, detachTransaction());
1071  codecCallback_->onAbort(handler1.txn_->getID(),
1072  ErrorCode::INTERNAL_ERROR);
1073  eventBase_.loop();
1074 
1075  EXPECT_CALL(handler2, detachTransaction());
1076  EXPECT_CALL(mockController_, detachSession(_));
1077  httpSession_->dropConnection();
1078  }
1079 
1080  eventBase_.loop();
1081 }
1082 
1083 TEST_F(MockCodecDownstreamTest, ConnFlowControlBlocked) {
1084  testConnFlowControlBlocked(false);
1085 }
1086 
1087 TEST_F(MockCodecDownstreamTest, ConnFlowControlTimeout) {
1088  testConnFlowControlBlocked(true);
1089 }
1090 
1091 TEST_F(MockCodecDownstreamTest, UnpausedLargePost) {
1092  // Make sure that a large POST that streams into the handler generates
1093  // connection level flow control so that the entire POST can be received.
1094  InSequence enforceOrder;
1095  NiceMock<MockHTTPHandler> handler1;
1096  unsigned kNumChunks = 10;
1097  auto wantToWrite = spdy::kInitialWindow * kNumChunks;
1098  auto wantToWriteStr = folly::to<string>(wantToWrite);
1099  auto req1 = makePostRequest(wantToWrite);
1100  auto req1Body = makeBuf(wantToWrite);
1101 
1102  EXPECT_CALL(mockController_, getRequestHandler(_, _))
1103  .WillOnce(Return(&handler1));
1104  EXPECT_CALL(handler1, setTransaction(_))
1105  .WillOnce(SaveArg<0>(&handler1.txn_));
1106 
1107  EXPECT_CALL(handler1, onHeadersComplete(_));
1108  for (unsigned i = 0; i < kNumChunks; ++i) {
1109  EXPECT_CALL(*codec_, generateWindowUpdate(_, 0, spdy::kInitialWindow));
1110  EXPECT_CALL(handler1, onBody(PtrBufHasLen(spdy::kInitialWindow)));
1111  EXPECT_CALL(*codec_, generateWindowUpdate(_, 1, spdy::kInitialWindow));
1112  }
1113  EXPECT_CALL(handler1, onEOM());
1114 
1115  codecCallback_->onMessageBegin(1, req1.get());
1116  codecCallback_->onHeadersComplete(1, std::move(req1));
1117  // Give kNumChunks chunks, each of the maximum window size. We should generate
1118  // window update for each chunk
1119  for (unsigned i = 0; i < kNumChunks; ++i) {
1120  codecCallback_->onBody(1, makeBuf(spdy::kInitialWindow), 0);
1121  }
1122  codecCallback_->onMessageComplete(1, false);
1123 
1124  // Just tear everything down now.
1125  EXPECT_CALL(mockController_, detachSession(_));
1126  httpSession_->dropConnection();
1127 }
1128 
1129 TEST_F(MockCodecDownstreamTest, IngressPausedWindowUpdate) {
1130  // Test sending a large response body while the handler has ingress paused. We
1131  // should process the ingress window_updates and deliver the full body
1132  InSequence enforceOrder;
1133  NiceMock<MockHTTPHandler> handler1;
1134  auto req = makeGetRequest();
1135  size_t respSize = spdy::kInitialWindow * 10;
1136  unique_ptr<HTTPMessage> resp;
1137  unique_ptr<folly::IOBuf> respBody;
1138  tie(resp, respBody) = makeResponse(200, respSize);
1139  size_t written = 0;
1140 
1141  EXPECT_CALL(mockController_, getRequestHandler(_, _))
1142  .WillOnce(Return(&handler1));
1143  EXPECT_CALL(handler1, setTransaction(_))
1144  .WillOnce(SaveArg<0>(&handler1.txn_));
1145 
1146  EXPECT_CALL(handler1, onHeadersComplete(_))
1147  .WillOnce(InvokeWithoutArgs([&] () {
1148  // Pause ingress. Make sure we process the window updates anyway
1149  handler1.txn_->pauseIngress();
1150  }));
1151  EXPECT_CALL(*codec_, generateHeader(_, _, _, _, _));
1152  EXPECT_CALL(*codec_, generateBody(_, _, _, _, _))
1153  .WillRepeatedly(Invoke([&](folly::IOBufQueue&,
1155  std::shared_ptr<folly::IOBuf> chain,
1157  bool /*eom*/) {
1158  auto len = chain->computeChainDataLength();
1159  written += len;
1160  return len;
1161  }));
1162 
1163  codecCallback_->onWindowUpdate(0, respSize); // open conn-level window
1164  codecCallback_->onMessageBegin(1, req.get());
1165  codecCallback_->onHeadersComplete(1, std::move(req));
1166  EXPECT_TRUE(handler1.txn_->isIngressPaused());
1167 
1168  // Unblock txn-level flow control and try to egress the body
1169  codecCallback_->onWindowUpdate(1, respSize);
1170  handler1.txn_->sendHeaders(*resp);
1171  handler1.txn_->sendBody(std::move(respBody));
1172 
1173  eventBase_.loop();
1174  EXPECT_EQ(written, respSize);
1175 
1176  // Just tear everything down now.
1177  EXPECT_CALL(mockController_, detachSession(_));
1178  httpSession_->dropConnection();
1179 }
1180 
1181 TEST_F(MockCodecDownstreamTest, ShutdownThenSendPushHeaders) {
1182  // Test that notifying session of shutdown before sendHeaders() called on a
1183  // pushed txn lets that push txn finish.
1184  EXPECT_CALL(*codec_, supportsPushTransactions())
1185  .WillRepeatedly(Return(true));
1186 
1187  InSequence enforceOrder;
1189  MockHTTPPushHandler pushHandler;
1190  auto req = makeGetRequest();
1191 
1192  EXPECT_CALL(mockController_, getRequestHandler(_, _))
1193  .WillOnce(Return(&handler));
1194  EXPECT_CALL(handler, setTransaction(_))
1195  .WillOnce(SaveArg<0>(&handler.txn_));
1196 
1197  EXPECT_CALL(handler, onHeadersComplete(_))
1198  .WillOnce(Invoke([&] (std::shared_ptr<HTTPMessage>) {
1199  auto pushTxn = handler.txn_->newPushedTransaction(&pushHandler);
1200  // start shutdown process
1201  httpSession_->notifyPendingShutdown();
1202  // we should be able to process new requests
1203  EXPECT_TRUE(codec_->isReusable());
1204  pushHandler.sendPushHeaders("/foo", "www.foo.com", 0,
1205  handler.txn_->getPriority());
1206  // we should* still* be able to process new requests
1207  EXPECT_TRUE(codec_->isReusable());
1208  pushTxn->sendEOM();
1209  }));
1210  EXPECT_CALL(pushHandler, setTransaction(_))
1211  .WillOnce(SaveArg<0>(&pushHandler.txn_));
1212  EXPECT_CALL(*codec_, generatePushPromise(_, 2, _, _, _, _));
1213  EXPECT_CALL(*codec_, generateEOM(_, 2));
1214  EXPECT_CALL(pushHandler, detachTransaction());
1215  EXPECT_CALL(handler, onEOM())
1216  .WillOnce(Invoke([&] {
1217  handler.sendReply();
1218  }));
1219  EXPECT_CALL(*codec_, generateHeader(_, 1, _, _, _));
1220  EXPECT_CALL(*codec_, generateEOM(_, 1));
1221  EXPECT_CALL(handler, detachTransaction());
1222 
1223  codecCallback_->onMessageBegin(1, req.get());
1224  codecCallback_->onHeadersComplete(1, std::move(req));
1225  codecCallback_->onMessageComplete(1, false);
1226 
1227  // finish shutdown
1228  EXPECT_CALL(*codec_, onIngressEOF());
1229  EXPECT_CALL(mockController_, detachSession(_));
1230  httpSession_->dropConnection();
1231 
1232  eventBase_.loop();
1233 }
1234 
1235 TEST_F(MockCodecDownstreamTest, ReadIobufChainShutdown) {
1236  // Given an ingress IOBuf chain of 2 parts, if we shutdown after reading the
1237  // first part of the chain, we shouldn't read the second part. One way to
1238  // simulate a 2 part chain is to put more ingress in readBuf while we are
1239  // inside HTTPCodec::onIngress()
1240 
1241  InSequence enforceOrder;
1242 
1243  auto f = [&] () {
1244  void* buf;
1245  size_t bufSize;
1246  transportCb_->getReadBuffer(&buf, &bufSize);
1247  transportCb_->readDataAvailable(bufSize);
1248  };
1249 
1250  EXPECT_CALL(*codec_, onIngress(_))
1251  .WillOnce(Invoke([&] (const IOBuf& buf) {
1252  // This first time, don't process any data. This will cause the
1253  // ingress chain to grow in size later.
1254  EXPECT_FALSE(buf.isChained());
1255  return 0;
1256  }))
1257  .WillOnce(Invoke([&] (const IOBuf& buf) {
1258  // Now there should be a second buffer in the chain.
1259  EXPECT_TRUE(buf.isChained());
1260  // Shutdown writes. This enough to destroy the session.
1261  httpSession_->closeWhenIdle();
1262  return buf.length();
1263  }));
1264  // We shouldn't get a third onIngress() callback. This will be enforced by the
1265  // test framework since the codec is a strict mock.
1266  EXPECT_CALL(*codec_, isBusy());
1267  EXPECT_CALL(*codec_, onIngressEOF());
1268  EXPECT_CALL(mockController_, detachSession(_));
1269 
1270  f();
1271  f(); // The first time wasn't processed, so this should make a len=2 chain.
1272  eventBase_.loop();
1273 }
1274 
1276  bool dropConnection) {
1278  MockHTTPHandler pushHandler;
1279 
1280  liveGoaways_ = true;
1281  if (doubleGoaway) {
1282  EXPECT_CALL(mockController_, getRequestHandler(_, _))
1283  .WillOnce(Return(&handler));
1284  EXPECT_CALL(handler, setTransaction(_))
1285  .WillOnce(SaveArg<0>(&handler.txn_));
1286 
1287  EXPECT_CALL(handler, onHeadersComplete(_));
1288  EXPECT_CALL(handler, onEOM())
1289  .WillOnce(Invoke([&] {
1290  handler.sendReply();
1291  }));
1292  EXPECT_CALL(*codec_, generateHeader(_, 1, _, _, _));
1293  EXPECT_CALL(*codec_, generateEOM(_, 1));
1294  EXPECT_CALL(handler, detachTransaction());
1295 
1296  // Turn on double GOAWAY drain
1297  codec_->enableDoubleGoawayDrain();
1298  }
1299 
1300  // Send a GOAWAY acking uninitiated transactions
1301  EXPECT_FALSE(drainPending_);
1302  httpSession_->notifyPendingShutdown();
1303  EXPECT_EQ(drainPending_, doubleGoaway);
1304  EXPECT_FALSE(reusable_);
1305 
1306  if (doubleGoaway) {
1307  // Should be able to process new requests
1308  auto req1 = makeGetRequest();
1309  codecCallback_->onMessageBegin(1, req1.get());
1310  codecCallback_->onHeadersComplete(1, std::move(req1));
1311  codecCallback_->onMessageComplete(1, false);
1312  }
1313 
1315  EXPECT_CALL(*transport_, writeChain(_, _, _))
1316  .WillOnce(Invoke([&] (folly::AsyncTransportWrapper::WriteCallback* callback,
1317  const shared_ptr<IOBuf>,
1318  WriteFlags) {
1319  // don't immediately flush the goaway
1320  cb = callback;
1321  }));
1322  if (doubleGoaway || !dropConnection) {
1323  // single goaway, drop connection doesn't get onIngressEOF
1324  EXPECT_CALL(*codec_, onIngressEOF());
1325  }
1326  eventBase_.loopOnce();
1327 
1328  EXPECT_CALL(mockController_, detachSession(_));
1329  if (dropConnection) {
1330  EXPECT_CALL(*transport_, closeWithReset())
1331  .Times(AtLeast(1))
1332  .WillOnce(DoAll(Assign(&transportGood_, false),
1333  Invoke([cb] {
1336  cb->writeErr(0, ex);
1337  })));
1338 
1339  httpSession_->dropConnection();
1340  } else {
1341  EXPECT_CALL(*codec_, isBusy());
1342  httpSession_->closeWhenIdle();
1343  cb->writeSuccess();
1344  }
1345  EXPECT_FALSE(drainPending_);
1346  EXPECT_FALSE(reusable_);
1347 }
1348 
1349 TEST_F(MockCodecDownstreamTest, SendDoubleGoawayTimeout) {
1350  testGoaway(true, true);
1351 }
1352 TEST_F(MockCodecDownstreamTest, SendDoubleGoawayIdle) {
1353  testGoaway(true, false);
1354 }
1355 TEST_F(MockCodecDownstreamTest, SendGoawayTimeout) {
1356  testGoaway(false, true);
1357 }
1359  testGoaway(false, false);
1360 }
1361 
1364  MockHTTPHandler pushHandler;
1365 
1366  liveGoaways_ = true;
1367 
1368  EXPECT_CALL(*codec_, onIngressEOF());
1369  EXPECT_CALL(mockController_, detachSession(_));
1370  EXPECT_CALL(*transport_, closeWithReset())
1371  .Times(AtLeast(1))
1372  .WillOnce(Assign(&transportGood_, false));
1373  httpSession_->dropConnection();
1374 }
1375 
1376 TEST_F(MockCodecDownstreamTest, DropConnectionNogoaway) {
1378  MockHTTPHandler pushHandler;
1379 
1380  liveGoaways_ = false;
1381 
1382  EXPECT_CALL(*codec_, onIngressEOF());
1383  EXPECT_CALL(mockController_, detachSession(_));
1384  EXPECT_CALL(*transport_, closeNow())
1385  .Times(AtLeast(1))
1386  .WillOnce(Assign(&transportGood_, false));
1387  httpSession_->dropConnection();
1388 }
1389 
1390 TEST_F(MockCodecDownstreamTest, ShutdownThenError) {
1391  // Test that we ignore any errors after we shutdown the socket in HTTPSession.
1392  onIngressImpl([&] {
1393  // This executes as the implementation of HTTPCodec::onIngress()
1394  InSequence dummy;
1395 
1396  HTTPException err(HTTPException::Direction::INGRESS, "foo");
1397  err.setHttpStatusCode(400);
1398  HTTPMessage req = getGetRequest();
1399  EXPECT_CALL(mockController_, getParseErrorHandler(_, _, _))
1400  .WillOnce(Return(nullptr));
1401 
1402  // Creates and adds a txn to the session
1403  codecCallback_->onMessageBegin(1, &req);
1404 
1405  httpSession_->closeWhenIdle();
1406 
1407  codecCallback_->onError(1, err, false);
1408  });
1409  // flush the shutdown callback
1410  EXPECT_CALL(mockController_, detachSession(_));
1411  eventBase_.loopOnce();
1412 }
1413 
1414 TEST_F(MockCodecDownstreamTest, PingDuringShutdown) {
1415  onIngressImpl([&] {
1416  InSequence dummy;
1417 
1418  // Shutdown writes only. Since the session is empty, this normally
1419  // causes the session to close, but it is held open since we are in
1420  // the middle of parsing ingress.
1421  EXPECT_CALL(*codec_, isBusy());
1422  EXPECT_CALL(*codec_, onIngressEOF());
1423  httpSession_->closeWhenIdle();
1424 
1425  // We read a ping off the wire, which makes us enqueue a ping reply
1426  EXPECT_CALL(*codec_, generatePingReply(_, _))
1427  .WillOnce(Return(10));
1428  codecCallback_->onPingRequest(1);
1429 
1430  // When this function returns, the controller gets detachSession()
1431  EXPECT_CALL(mockController_, detachSession(_));
1432  });
1433 }
1434 
1436  EXPECT_CALL(*codec_, generateSettingsAck(_));
1437  codecCallback_->onSettings(
1438  {{SettingsId::INITIAL_WINDOW_SIZE, 4000}});
1439  EXPECT_CALL(*codec_, onIngressEOF());
1440  EXPECT_CALL(mockController_, detachSession(_));
1441  httpSession_->dropConnection();
1442 }
1443 
1444 TEST_F(MockCodecDownstreamTest, TestSendCertificateRequest) {
1445  auto certRequestContext = folly::IOBuf::copyBuffer("0123456789abcdef");
1446  fizz::SignatureAlgorithms sigAlgs;
1447  sigAlgs.supported_signature_algorithms.push_back(
1448  SignatureScheme::ecdsa_secp256r1_sha256);
1449  std::vector<fizz::Extension> extensions;
1450  extensions.push_back(encodeExtension(std::move(sigAlgs)));
1451 
1452  std::unique_ptr<StrictMock<MockSecondaryAuthManager>> secondAuthManager_(
1454  httpSession_->setSecondAuthManager(std::move(secondAuthManager_));
1455  auto authManager = dynamic_cast<MockSecondaryAuthManager*>(
1456  httpSession_->getSecondAuthManager());
1457  EXPECT_CALL(*codec_, getIngressSettings())
1458  .WillOnce(Return(&kIngressCertAuthSettings));
1459  EXPECT_CALL(*codec_, getEgressSettings())
1460  .WillOnce(Return(&kEgressCertAuthSettings));
1461  EXPECT_CALL(*authManager, createAuthRequest(_, _))
1462  .WillOnce(InvokeWithoutArgs([]() {
1463  return std::make_pair(120, IOBuf::copyBuffer("authenticatorrequest"));
1464  }));
1465  EXPECT_CALL(*codec_, generateCertificateRequest(_, _, _))
1466  .WillOnce(Return(20));
1467  auto encodedSize = httpSession_->sendCertificateRequest(
1468  std::move(certRequestContext), std::move(extensions));
1469  EXPECT_EQ(encodedSize, 20);
1470 
1471  EXPECT_CALL(*codec_, onIngressEOF());
1472  EXPECT_CALL(mockController_, detachSession(_));
1473  httpSession_->dropConnection();
1474 }
folly::HHWheelTimer::UniquePtr makeInternalTimeoutSet(EventBase *evb)
Definition: TestUtils.cpp:22
#define T(v)
Definition: http_parser.c:233
void setHttpStatusCode(uint32_t statusCode)
Definition: HTTPException.h:86
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
const uint32_t kInitialWindow
auto f
void timeoutExpired() noexceptoverride
GTEST_API_ Cardinality AtLeast(int n)
ProxygenError getProxygenError() const
Definition: Exception.h:50
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
void testConnFlowControlBlocked(bool timeout)
const SocketAddress peerAddr
Definition: TestUtils.cpp:20
bool hasCodecStatusCode() const
Definition: HTTPException.h:95
StreamCodecFactory stream
http2::PriorityUpdate getPriority() const
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
void onWriteChain(folly::AsyncTransportWrapper::WriteCallback *callback, std::shared_ptr< IOBuf >, WriteFlags)
HTTPDownstreamSession * httpSession_
STL namespace.
void sendReplyWithBody(uint32_t code, uint32_t content_length, bool keepalive=true, bool sendEOM=true, bool hasTrailers=false)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
bool isChained() const
Definition: IOBuf.h:760
const wangle::TransportInfo mockTransportInfo
Definition: TestUtils.cpp:18
std::unique_ptr< Codec > codec_
std::unique_ptr< HTTPMessage > makePostRequest(uint32_t contentLength)
Definition: TestUtils.cpp:124
tuple make_tuple()
Definition: gtest-tuple.h:675
PolymorphicAction< internal::InvokeWithoutArgsAction< FunctionImpl > > InvokeWithoutArgs(FunctionImpl function_impl)
internal::ReturnRefAction< R > ReturnRef(R &x)
void handler(int, siginfo_t *, void *)
std::unique_ptr< folly::IOBuf > makeBuf(uint32_t size)
Definition: ZlibTests.cpp:26
void writeBuf(const Buf &buf, folly::io::Appender &out)
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
void getLocalAddress(folly::SocketAddress &addr) const
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void testGoaway(bool doubleGoaway, bool dropConnection)
const HTTPSettings kIngressCertAuthSettings
std::size_t length() const
Definition: IOBuf.h:533
void dummy()
ErrorCode getCodecStatusCode() const
void sendHeaders(uint32_t code, uint32_t content_length, bool keepalive=true, HeaderMap headers=HeaderMap())
std::vector< SignatureScheme > supported_signature_algorithms
Definition: Extensions.h:17
StrictMock< MockController > mockController_
Definition: Actions.h:16
HTTPSettings kEgressCertAuthSettings
void fakeMockCodec(MockHTTPCodec &codec)
Definition: TestUtils.cpp:173
std::vector< folly::AsyncTransportWrapper::WriteCallback * > cbs_
void sendBody(uint32_t content_length)
std::unique_ptr< HHWheelTimer, Destructor > UniquePtr
Definition: HHWheelTimer.h:57
std::unique_ptr< HTTPMessage > makeResponse(uint16_t statusCode)
Definition: TestUtils.cpp:147
bool hasProxygenError() const
Definition: Exception.h:44
virtual HTTPTransaction * newPushedTransaction(HTTPPushTransactionHandler *handler)
folly::AsyncTransportWrapper::ReadCallback * transportCb_
StrictMock< MockHTTPCodec > * codec_
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void sendPushHeaders(const std::string &path, const std::string &host, uint32_t content_length, http2::PriorityUpdate pri)
HTTPMessage getGetRequest(const std::string &url)
Definition: TestUtils.cpp:76
const char * string
Definition: Conv.cpp:212
AsyncFizzClient::UniquePtr transport_
folly::HHWheelTimer::UniquePtr transactionTimeouts_
const SocketAddress localAddr
Definition: TestUtils.cpp:19
PolymorphicAction< internal::AssignAction< T1, T2 > > Assign(T1 *ptr, T2 val)
internal::DoBothAction< Action1, Action2 > DoAll(Action1 a1, Action2 a2)
std::unique_ptr< HTTPMessage > makeGetRequest()
Definition: TestUtils.cpp:98
#define EXPECT_CALL(obj, call)
uint64_t StreamID
Definition: HTTPCodec.h:49
const internal::AnythingMatcher _
virtual void writeSuccess() noexcept=0
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
Extension encodeExtension(const TokenBindingParameters &params)
Definition: Types.cpp:113
HTTPCodec::StreamID getID() const
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
const char * what(void) const noexceptoverride
Definition: Exception.cpp:26
const HTTPSettings kDefaultIngressSettings
NiceMock< MockAsyncTransport > * transport_
internal::ReturnAction< R > Return(R value)
TEST_F(AcceptorTest, TestCanAcceptWithNoConnectionCounter)