proxygen
HTTPSession.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 #include <chrono>
14 #include <folly/Conv.h>
15 #include <folly/CppAttributes.h>
16 #include <folly/Random.h>
25 #include <folly/io/Cursor.h>
27 
30 using folly::AsyncSocket;
33 using folly::WriteFlags;
36 using folly::IOBuf;
37 using folly::IOBufQueue;
40 using std::make_unique;
41 using std::pair;
42 using std::set;
43 using std::string;
44 using std::unique_ptr;
45 using std::vector;
46 
47 namespace {
48 static const uint32_t kMinReadSize = 1460;
49 static const uint32_t kWriteReadyMax = 65536;
50 
51 // Lower = higher latency, better prioritization
52 // Higher = lower latency, less prioritization
53 static const uint32_t kMaxWritesPerLoop = 32;
54 
55 static constexpr folly::StringPiece kClientLabel =
56  "EXPORTER HTTP CERTIFICATE client";
57 static constexpr folly::StringPiece kServerLabel =
58  "EXPORTER HTTP CERTIFICATE server";
59 } // anonymous namespace
60 
61 namespace proxygen {
62 
64  HTTPSession* session,
65  uint64_t length)
66  : session_(session),
67  length_(length) {
68 }
69 
70 void
72  DCHECK(session_);
73  DCHECK(listHook.is_linked());
74  listHook.unlink();
75 }
76 
77 void
79  remove();
80  session_ = nullptr;
81 }
82 
83 void
85  // Unlink this write segment from the list before calling
86  // the session's onWriteSuccess() callback because, in the
87  // case where this is the last write for the connection,
88  // onWriteSuccess() looks for an empty write segment list
89  // as one of the criteria for shutting down the connection.
90  remove();
91 
92  // session_ should never be nullptr for a successful write
93  // The session is only cleared after a write error or timeout, and all
94  // AsyncTransport write failures are fatal. If session_ is nullptr at this
95  // point it means the AsyncTransport implementation is not failing
96  // subsequent writes correctly after an error.
98  delete this;
99 }
100 
101 void
103  const AsyncSocketException& ex) noexcept {
104  // After one segment fails to write, we clear the session_
105  // pointer in all subsequent write segments, so we ignore their
106  // writeError() callbacks.
107  if (session_) {
108  remove();
109  session_->onWriteError(bytesWritten, ex);
110  }
111  delete this;
112 }
113 
115  folly::HHWheelTimer* transactionTimeouts,
116  AsyncTransportWrapper::UniquePtr sock,
117  const SocketAddress& localAddr,
118  const SocketAddress& peerAddr,
119  HTTPSessionController* controller,
120  unique_ptr<HTTPCodec> codec,
121  const TransportInfo& tinfo,
122  InfoCallback* infoCallback):
123  HTTPSession(WheelTimerInstance(transactionTimeouts), std::move(sock),
124  localAddr, peerAddr, controller, std::move(codec),
125  tinfo, infoCallback) {
126 }
127 
129  const WheelTimerInstance& timeout,
130  AsyncTransportWrapper::UniquePtr sock,
131  const SocketAddress& localAddr,
132  const SocketAddress& peerAddr,
133  HTTPSessionController* controller,
134  unique_ptr<HTTPCodec> codec,
135  const TransportInfo& tinfo,
136  InfoCallback* infoCallback):
137  HTTPSessionBase(localAddr, peerAddr, controller, tinfo, infoCallback,
138  std::move(codec)),
139  writeTimeout_(this),
141  WheelTimerInstance(timeout) :
143  sock_(std::move(sock)),
144  timeout_(timeout),
145  draining_(false),
146  started_(false),
147  writesDraining_(false),
149  ingressError_(false),
150  flowControlTimeout_(this),
151  drainTimeout_(this),
152  reads_(SocketState::PAUSED),
153  writes_(SocketState::UNPAUSED),
154  ingressUpgraded_(false),
155  resetSocketOnShutdown_(false),
156  inLoopCallback_(false),
157  inResume_(false),
158  pendingPause_(false) {
159  byteEventTracker_ = std::make_shared<ByteEventTracker>(this);
162 
163  codec_.add<HTTPChecks>();
164 
165  setupCodec();
166 
168 
169  if (infoCallback_) {
170  infoCallback_->onCreate(*this);
171  }
172 
173  auto controllerPtr = getController();
174  if (controllerPtr) {
176  controllerPtr->getSessionFlowControlTimeout());
177  }
179 
180  if (!sock_->isReplaySafe()) {
181  sock_->setReplaySafetyCallback(this);
182  }
183 }
184 
186  uint32_t certAuthSettingVal = 0;
187  constexpr uint16_t settingLen = 4;
188  std::unique_ptr<folly::IOBuf> ekm;
190  if (isUpstream()) {
191  label = kClientLabel;
192  } else {
193  label = kServerLabel;
194  }
195  auto fizzBase = getTransport()->getUnderlyingTransport<AsyncFizzBase>();
196  if (fizzBase) {
197  ekm = fizzBase->getEkm(label, nullptr, settingLen);
198  } else {
199  VLOG(4) << "Underlying transport does not support secondary "
200  "authentication.";
201  return certAuthSettingVal;
202  }
203  if (ekm && ekm->computeChainDataLength() == settingLen) {
204  folly::io::Cursor cursor(ekm.get());
205  uint32_t ekmVal = cursor.readBE<uint32_t>();
206  certAuthSettingVal = (ekmVal & 0x3fffffff) | 0x80000000;
207  }
208  return certAuthSettingVal;
209 }
210 
212  uint32_t certAuthSettingVal = 0;
213  constexpr uint16_t settingLen = 4;
214  std::unique_ptr<folly::IOBuf> ekm;
216  if (isUpstream()) {
217  label = kServerLabel;
218  } else {
219  label = kClientLabel;
220  }
221  auto fizzBase = getTransport()->getUnderlyingTransport<AsyncFizzBase>();
222  if (fizzBase) {
223  ekm = fizzBase->getEkm(label, nullptr, settingLen);
224  } else {
225  VLOG(4) << "Underlying transport does not support secondary "
226  "authentication.";
227  return false;
228  }
229  if (ekm && ekm->computeChainDataLength() == settingLen) {
230  folly::io::Cursor cursor(ekm.get());
231  uint32_t ekmVal = cursor.readBE<uint32_t>();
232  certAuthSettingVal = (ekmVal & 0x3fffffff) | 0x80000000;
233  } else {
234  return false;
235  }
236  if (certAuthSettingVal == value) {
237  return true;
238  } else {
239  return false;
240  }
241 }
242 
245  // until we support upstream pipelining
248  }
249 
250  // If a secondary authentication manager is configured for this session, set
251  // the SETTINGS_HTTP_CERT_AUTH to indicate support for HTTP-layer certificate
252  // authentication.
253  uint32_t certAuthSettingVal = 0;
254  if (secondAuthManager_) {
255  certAuthSettingVal = getCertAuthSettingVal();
256  }
258  if (settings) {
261  if (certAuthSettingVal != 0) {
263  certAuthSettingVal);
264  }
265  }
267 
270  codec_.addFilters(std::unique_ptr<FlowControlFilter>(connFlowControl_));
271  // if we really support switching from spdy <-> h2, we need to update
272  // existing flow control filter
273  }
274 
275  codec_.setCallback(this);
276 }
277 
279  VLOG(4) << *this << " closing";
280 
281  CHECK(transactions_.empty());
283  CHECK(txnEgressQueue_.empty());
284  DCHECK(!sock_->getReadCallback());
285 
286  if (writeTimeout_.isScheduled()) {
288  }
289 
292  }
293 
295 }
296 
298  CHECK(!started_);
299  started_ = true;
301  if (connFlowControl_) {
304  }
305  // For HTTP/2 if we are currently draining it means we got notified to
306  // shutdown before we sent a SETTINGS frame, so we defer sending a GOAWAY
307  // util we've started and sent SETTINGS.
308  if (draining_) {
312  }
313  scheduleWrite();
314  resumeReads();
315 }
316 
318  std::shared_ptr<ByteEventTracker> byteEventTracker) {
319  if (byteEventTracker && byteEventTracker_) {
320  byteEventTracker->absorb(std::move(*byteEventTracker_));
321  }
322  byteEventTracker_ = byteEventTracker;
323  if (byteEventTracker_) {
324  byteEventTracker_->setCallback(this);
325  byteEventTracker_->setTTLBAStats(sessionStats_);
326  }
327 }
328 
331  if (byteEventTracker_) {
332  byteEventTracker_->setTTLBAStats(stats);
333  }
334 }
335 
336 void HTTPSession::setFlowControl(size_t initialReceiveWindow,
337  size_t receiveStreamWindowSize,
338  size_t receiveSessionWindowSize) {
339  CHECK(!started_);
340  initialReceiveWindow_ = initialReceiveWindow;
341  receiveStreamWindowSize_ = receiveStreamWindowSize;
342  receiveSessionWindowSize_ = receiveSessionWindowSize;
343  HTTPSessionBase::setReadBufferLimit(receiveSessionWindowSize);
345  if (settings) {
348  }
349 }
350 
352  VLOG_IF(4, started_) << "Must flush egress settings to peer";
354  if (settings) {
355  for (const auto& setting: inSettings) {
356  settings->setSetting(setting.id, setting.value);
357  }
358  }
359 }
360 
362  CHECK(!started_);
366  if (settings) {
369  }
370  }
371 }
372 
374  CHECK(!started_);
375  egressBytesLimit_ = bytesLimit;
376 }
377 
378 void
380  VLOG(3) << "session-level timeout on " << *this;
381 
382  if (liveTransactions_ != 0) {
383  // There's at least one open transaction with a read timeout scheduled.
384  // We got here because the session timeout == the transaction timeout.
385  // Ignore, since the transaction is going to timeout very soon.
386  VLOG(4) << *this <<
387  "ignoring session timeout, transaction timeout imminent";
388  resetTimeout();
389  return;
390  }
391 
392  if (!transactions_.empty()) {
393  // There are one or more transactions, but none of them are live.
394  // That's valid if they've all received their full ingress messages
395  // and are waiting for their Handlers to process those messages.
396  VLOG(4) << *this <<
397  "ignoring session timeout, no transactions awaiting reads";
398  resetTimeout();
399  return;
400  }
401 
402  VLOG(4) << *this << " Timeout with nothing pending";
403 
405  auto controller = getController();
406  if (controller) {
408  controller->getGracefulShutdownTimeout());
409  }
411 }
412 
413 void
415  VLOG(4) << "Write timeout for " << *this;
416 
417  CHECK(!pendingWrites_.empty());
418  DestructorGuard g(this);
419 
422 }
423 
424 void
426  VLOG(4) << "Flow control timeout for " << *this;
427 
428  DestructorGuard g(this);
429 
431  shutdownTransport(true, true);
432 }
433 
434 void
435 HTTPSession::describe(std::ostream& os) const {
436  os << "proto=" << getCodecProtocolString(codec_->getProtocol());
437  if (isDownstream()) {
438  os << ", UA=" << codec_->getUserAgent()
439  << ", downstream=" << getPeerAddress() << ", " << getLocalAddress()
440  << "=local";
441  } else {
442  os << ", local=" << getLocalAddress() << ", " << getPeerAddress()
443  << "=upstream";
444  }
445 }
446 
447 bool
449  return !transactions_.empty() || codec_->isBusy();
450 }
451 
452 void
454  scheduleWrite();
455 }
456 
457 void
459  VLOG(4) << *this << " notified pending shutdown";
460  drain();
461 }
462 
463 void
465  // If drain() already called, this is a noop
466  drain();
467  // Generate the second GOAWAY now. No-op if second GOAWAY already sent.
471  scheduleWrite();
472  }
473  if (!isBusy() && !hasMoreWrites()) {
474  // if we're already idle, close now
475  dropConnection();
476  }
477 }
478 
480  if (isLoopCallbackScheduled()) {
482  }
483  if (shutdownTransportCb_) {
484  shutdownTransportCb_.reset();
485  }
486  // checkForShutdown only closes the connection if these conditions are true
487  DCHECK(writesShutdown());
488  DCHECK(transactions_.empty());
490 }
491 
492 void
494  VLOG(4) << "dropping " << *this;
495  if (!sock_ || (readsShutdown() && writesShutdown())) {
496  VLOG(4) << *this << " already shutdown";
497  return;
498  }
499 
501  if (transactions_.empty() && !hasMoreWrites()) {
502  DestructorGuard dg(this);
503  shutdownTransport(true, true);
504  // shutdownTransport might have generated a write (goaway)
505  // If so, writes will not be shutdown, so fall through to
506  // shutdownTransportWithReset.
507  if (readsShutdown() && writesShutdown()) {
509  return;
510  }
511  }
513 }
514 
516 
519 }
520 
523 }
524 
525 void
526 HTTPSession::getReadBuffer(void** buf, size_t* bufSize) {
527  FOLLY_SCOPED_TRACE_SECTION("HTTPSession - getReadBuffer");
528  pair<void*,uint32_t> readSpace =
530  *buf = readSpace.first;
531  *bufSize = readSpace.second;
532 }
533 
534 void
537  "HTTPSession - readDataAvailable", "readSize", readSize);
538  VLOG(10) << "read completed on " << *this << ", bytes=" << readSize;
539 
540  DestructorGuard dg(this);
541  resetTimeout();
542  readBuf_.postallocate(readSize);
543 
544  if (infoCallback_) {
545  infoCallback_->onRead(*this, readSize);
546  }
547 
548  processReadData();
549 }
550 
551 bool
553  return true;
554 }
555 
556 void
558  size_t readSize = readBuf->computeChainDataLength();
560  "HTTPSession - readBufferAvailable", "readSize", readSize);
561  VLOG(5) << "read completed on " << *this << ", bytes=" << readSize;
562 
563  DestructorGuard dg(this);
564  resetTimeout();
566 
567  if (infoCallback_) {
568  infoCallback_->onRead(*this, readSize);
569  }
570 
571  processReadData();
572 }
573 
574 void
576  FOLLY_SCOPED_TRACE_SECTION("HTTPSession - processReadData");
577  // skip any empty IOBufs before feeding CODEC.
578  while (readBuf_.front() != nullptr && readBuf_.front()->length() == 0) {
580  }
581 
582  // Pass the ingress data through the codec to parse it. The codec
583  // will invoke various methods of the HTTPSession as callbacks.
584  const IOBuf* currentReadBuf;
585  // It's possible for the last buffer in a chain to be empty here.
586  // AsyncTransport saw fd activity so asked for a read buffer, but it was
587  // SSL traffic and not enough to decrypt a whole record. Later we invoke
588  // this function from the loop callback.
589  while (!ingressError_ &&
590  readsUnpaused() &&
591  ((currentReadBuf = readBuf_.front()) != nullptr &&
592  currentReadBuf->length() != 0)) {
593  // We're about to parse, make sure the parser is not paused
594  codec_->setParserPaused(false);
595  size_t bytesParsed = codec_->onIngress(*currentReadBuf);
596  if (bytesParsed == 0) {
597  // If the codec didn't make any progress with current input, we
598  // better get more.
599  break;
600  }
601  readBuf_.trimStart(bytesParsed);
602  }
603 }
604 
605 void
607  DestructorGuard guard(this);
608  VLOG(4) << "EOF on " << *this;
609  // for SSL only: error without any bytes from the client might happen
610  // due to client-side issues with the SSL cert. Note that it can also
611  // happen if the client sends a SPDY frame header but no body.
612  if (infoCallback_
613  && transportInfo_.secure && getNumTxnServed() == 0 && readBuf_.empty()) {
615  }
616 
617  // Shut down reads, and also shut down writes if there are no
618  // transactions. (If there are active transactions, leave the
619  // write side of the socket open so those transactions can
620  // finish generating responses.)
622  shutdownTransport(true, transactions_.empty());
623 }
624 
625 void
627  DestructorGuard guard(this);
628  VLOG(4) << "read error on " << *this << ": " << ex.what();
629 
630  auto sslEx = dynamic_cast<const folly::SSLException*>(&ex);
631  if (infoCallback_ && sslEx) {
632  if (sslEx->getSSLError() == folly::SSLError::CLIENT_RENEGOTIATION) {
634  }
635  }
636 
637  // We're definitely finished reading. Don't close the write side
638  // of the socket if there are outstanding transactions, though.
639  // Instead, give the transactions a chance to produce any remaining
640  // output.
641  if (sslEx && sslEx->getSSLError() == folly::SSLError::SSL_ERROR) {
642  transportInfo_.sslError = ex.what();
643  }
645  shutdownTransport(true, transactions_.empty(), ex.what());
646 }
647 
650  HTTPCodec::StreamID assocStreamId,
653  return nullptr;
654  }
655  CHECK(isDownstream());
656  CHECK_NOTNULL(handler);
658  // This session doesn't support any more push transactions
659  // This could be an actual problem - since a single downstream SPDY session
660  // might be connected to N upstream hosts, each of which send M pushes,
661  // which exceeds the limit.
662  // should we queue?
663  return nullptr;
664  }
665 
667  assocStreamId,
669  if (!txn) {
670  return nullptr;
671  }
672 
673  DestructorGuard dg(this);
674  auto txnID = txn->getID();
675  txn->setHandler(handler);
677  return txn;
678 }
679 
683  HTTPCodec::StreamID controlStream,
684  bool unidirectional) noexcept {
685  CHECK(handler && controlStream > 0);
686  auto eSettings = codec_->getEgressSettings();
687  if (!eSettings || !eSettings->getSetting(SettingsId::ENABLE_EX_HEADERS, 0)) {
689  << " does not support ExTransaction";
690  return nullptr;
691  }
693  LOG(ERROR) << "cannot support any more transactions in " << *this;
694  return nullptr;
695  }
696 
697  DCHECK(started_);
698  HTTPTransaction* txn =
701  HTTPCodec::ExAttributes(controlStream, unidirectional));
702  if (!txn) {
703  return nullptr;
704  }
705 
706  DestructorGuard dg(this);
707  txn->setHandler(handler);
709  return txn;
710 }
711 
714  if (settings) {
717  }
718  return codec_->getDefaultWindowSize();
719 }
720 
721 void
723  if (!egressLimitExceeded()) {
724  return;
725  }
726 
727  auto txn = findTransaction(streamID);
728  if (txn) {
729  // If writes are paused, start this txn off in the egress paused state
730  VLOG(4) << *this << " starting streamID=" << txn->getID()
731  << " egress paused, numActiveWrites_=" << numActiveWrites_;
732  txn->pauseEgress();
733  }
734 }
735 
739 
740  // if HTTP2 priorities are enabled, get them from the message
741  // and ignore otherwise
742  if (getHTTP2PrioritiesEnabled() && msg) {
743  auto res = msg->getHTTP2Priority();
744  if (res) {
745  h2Pri.streamDependency = std::get<0>(*res);
746  h2Pri.exclusive = std::get<1>(*res);
747  h2Pri.weight = std::get<2>(*res);
748  } else {
749  // HTTPMessage with setPriority called explicitly
750  h2Pri.streamDependency =
752  }
753  }
754  return h2Pri;
755 }
756 
757 void
759  VLOG(4) << "processing new msg streamID=" << streamID << " " << *this;
760  if (infoCallback_) {
762  }
763 
764  HTTPTransaction* txn = findTransaction(streamID);
765  if (txn) {
766  if (isDownstream() && txn->isPushed()) {
767  // Push streams are unidirectional (half-closed). If the downstream
768  // attempts to send ingress, abort with STREAM_CLOSED error.
770  "Downstream attempts to send ingress, abort.");
772  txn->onError(ex);
773  }
774  return; // If this transaction is already registered, no need to add it now
775  }
776 
777  http2::PriorityUpdate messagePriority = getMessagePriority(msg);
778  txn = createTransaction(streamID, HTTPCodec::NoStream,
779  HTTPCodec::NoExAttributes, messagePriority);
780  if (!txn) {
781  return; // This could happen if the socket is bad.
782  }
783 
785  // The previous transaction hasn't completed yet. Pause reads until
786  // it completes; this requires pausing both transactions.
787 
788  // HTTP/1.1 pipeline is detected, and which is incompactible with
789  // ByteEventTracker. Drain all the ByteEvents
790  CHECK(byteEventTracker_);
791  byteEventTracker_->drainByteEvents();
792 
793  // drainByteEvents() may detach txn(s). Don't pause read if one txn left
794  if (getPipelineStreamCount() < 2) {
795  DCHECK(readsUnpaused());
796  return;
797  }
798 
799  // There must be at least two transactions (we just checked). The previous
800  // txns haven't completed yet. Pause reads until they complete
801  DCHECK_GE(transactions_.size(), 2);
802  for (auto it = ++transactions_.rbegin(); it != transactions_.rend(); ++it) {
803  DCHECK(it->second.isIngressEOMSeen());
804  it->second.pauseIngress();
805  }
806  transactions_.rbegin()->second.pauseIngress();
807  DCHECK_EQ(liveTransactions_, 0);
808  DCHECK(readsPaused());
809  }
810 }
811 
812 void
814  HTTPCodec::StreamID assocStreamID,
815  HTTPMessage* msg) {
816  VLOG(4) << "processing new push promise streamID=" << streamID
817  << " on assocStreamID=" << assocStreamID << " " << *this;
818  if (infoCallback_) {
820  }
821  if (assocStreamID == 0) {
822  VLOG(2) << "push promise " << streamID << " should be associated with "
823  << "an active stream=" << assocStreamID << " " << *this;
825  return;
826  }
827 
828  if (isDownstream()) {
829  VLOG(2) << "push promise cannot be sent to upstream " << *this;
831  return;
832  }
833 
834  HTTPTransaction* assocTxn = findTransaction(assocStreamID);
835  if (!assocTxn || assocTxn->isIngressEOMSeen()) {
836  VLOG(2) << "cannot find the assocTxn=" << assocTxn
837  << ", or assoc stream is already closed by upstream" << *this;
839  return;
840  }
841 
842  http2::PriorityUpdate messagePriority = getMessagePriority(msg);
843  auto txn = createTransaction(streamID, assocStreamID,
844  HTTPCodec::NoExAttributes, messagePriority);
845  if (!txn) {
846  return; // This could happen if the socket is bad.
847  }
848 
849  if (!assocTxn->onPushedTransaction(txn)) {
850  VLOG(1) << "Failed to add pushed txn " << streamID
851  << " to assoc txn " << assocStreamID << " on " << *this;
853  folly::to<std::string>("Failed to add pushed transaction ", streamID));
855  onError(streamID, ex, true);
856  }
857 }
858 
859 void
861  HTTPCodec::StreamID controlStream,
862  bool unidirectional,
863  HTTPMessage* msg) {
864  VLOG(4) << "processing new ExMessage=" << streamID
865  << " on controlStream=" << controlStream << ", " << *this;
866  if (infoCallback_) {
868  }
869  if (controlStream == 0) {
870  LOG(ERROR) << "ExMessage=" << streamID << " should have an active control "
871  << "stream=" << controlStream << ", " << *this;
873  return;
874  }
875 
876  HTTPTransaction* controlTxn = findTransaction(controlStream);
877  if (!controlTxn) {
878  // control stream is broken, or remote sends a bogus stream id
879  LOG(ERROR) << "no control stream=" << controlStream << ", " << *this;
880  return;
881  }
882 
883  http2::PriorityUpdate messagePriority = getMessagePriority(msg);
884  auto txn = createTransaction(streamID,
886  HTTPCodec::ExAttributes(controlStream,
887  unidirectional),
888  messagePriority);
889  if (!txn) {
890  return; // This could happen if the socket is bad.
891  }
892  // control stream may be paused if the upstream is not ready yet
893  if (controlTxn->isIngressPaused()) {
894  txn->pauseIngress();
895  }
896 }
897 
898 void
900  unique_ptr<HTTPMessage> msg) {
901  // The codec's parser detected the end of an ingress message's
902  // headers.
903  VLOG(4) << "processing ingress headers complete for " << *this <<
904  ", streamID=" << streamID;
905 
906  if (!codec_->isReusable()) {
908  }
909 
910  if (infoCallback_) {
911  infoCallback_->onIngressMessage(*this, *msg.get());
912  }
913  HTTPTransaction* txn = findTransaction(streamID);
914  if (!txn) {
915  invalidStream(streamID);
916  return;
917  }
918 
919  const char* sslCipher =
920  transportInfo_.sslCipher ? transportInfo_.sslCipher->c_str() : nullptr;
921  msg->setSecureInfo(transportInfo_.sslVersion, sslCipher);
922  msg->setSecure(transportInfo_.secure);
923 
924  auto controlStreamID = txn->getControlStream();
925  if (controlStreamID) {
926  auto controlTxn = findTransaction(*controlStreamID);
927  if (!controlTxn) {
928  VLOG(2) << "txn=" << streamID << " with a broken controlTxn="
929  << *controlStreamID << " " << *this;
930  HTTPException ex(
932  folly::to<std::string>("broken controlTxn ", *controlStreamID));
933  onError(streamID, ex, true);
934  return;
935  }
936 
937  // Call onExTransaction() only for requests.
938  if (txn->isRemoteInitiated() && !controlTxn->onExTransaction(txn)) {
939  VLOG(2) << "Failed to add exTxn=" << streamID
940  << " to controlTxn=" << *controlStreamID << ", " << *this;
941  HTTPException ex(
943  folly::to<std::string>("Fail to add exTxn ", streamID));
945  onError(streamID, ex, true);
946  return;
947  }
948  } else {
949  setupOnHeadersComplete(txn, msg.get());
950  }
951 
952  // The txn may have already been aborted by the handler.
953  // Verify that the txn still exists before ingress callbacks.
954  txn = findTransaction(streamID);
955  if (!txn) {
956  return;
957  }
958 
959  if (!txn->getHandler()) {
960  txn->sendAbort();
961  return;
962  }
963 
964  // Tell the Transaction to start processing the message now
965  // that the full ingress headers have arrived.
966  txn->onIngressHeadersComplete(std::move(msg));
967 }
968 
969 void
971  unique_ptr<IOBuf> chain, uint16_t padding) {
972  FOLLY_SCOPED_TRACE_SECTION("HTTPSession - onBody");
973  DestructorGuard dg(this);
974  // The codec's parser detected part of the ingress message's
975  // entity-body.
976  uint64_t length = chain->computeChainDataLength();
977  HTTPTransaction* txn = findTransaction(streamID);
978  if (!txn) {
979  if (connFlowControl_ &&
981  scheduleWrite();
982  }
983  invalidStream(streamID);
984  return;
985  }
986 
987  if (HTTPSessionBase::onBodyImpl(std::move(chain), length, padding, txn)) {
988  VLOG(4) << *this << " pausing due to read limit exceeded.";
989  pauseReads();
990  }
991 }
992 
994  size_t length) {
995  // The codec's parser detected a chunk header (meaning that this
996  // connection probably is HTTP/1.1).
997  //
998  // After calling onChunkHeader(), the codec will call onBody() zero
999  // or more times and then call onChunkComplete().
1000  //
1001  // The reason for this callback on the chunk header is to support
1002  // an optimization. In general, the job of the codec is to present
1003  // the HTTPSession with an abstract view of a message,
1004  // with all the details of wire formatting hidden. However, there's
1005  // one important case where we want to know about chunking: reverse
1006  // proxying where both the client and server streams are HTTP/1.1.
1007  // In that scenario, we preserve the server's chunk boundaries when
1008  // sending the response to the client, in order to avoid possibly
1009  // making the egress packetization worse by rechunking.
1010  HTTPTransaction* txn = findTransaction(streamID);
1011  if (!txn) {
1012  invalidStream(streamID);
1013  return;
1014  }
1015  txn->onIngressChunkHeader(length);
1016 }
1017 
1019  // The codec's parser detected the end of the message body chunk
1020  // associated with the most recent call to onChunkHeader().
1021  HTTPTransaction* txn = findTransaction(streamID);
1022  if (!txn) {
1023  invalidStream(streamID);
1024  return;
1025  }
1026  txn->onIngressChunkComplete();
1027 }
1028 
1029 void
1031  unique_ptr<HTTPHeaders> trailers) {
1032  HTTPTransaction* txn = findTransaction(streamID);
1033  if (!txn) {
1034  invalidStream(streamID);
1035  return;
1036  }
1037  txn->onIngressTrailers(std::move(trailers));
1038 }
1039 
1040 void
1042  bool upgrade) {
1043  DestructorGuard dg(this);
1044  // The codec's parser detected the end of the ingress message for
1045  // this transaction.
1046  VLOG(4) << "processing ingress message complete for " << *this <<
1047  ", streamID=" << streamID;
1048  HTTPTransaction* txn = findTransaction(streamID);
1049  if (!txn) {
1050  invalidStream(streamID);
1051  return;
1052  }
1053 
1054  if (upgrade) {
1055  /* Send the upgrade callback to the transaction and the handler.
1056  * Currently we support upgrades for only HTTP sessions and not SPDY
1057  * sessions.
1058  */
1059  ingressUpgraded_ = true;
1061  return;
1062  }
1063 
1064  // txnIngressFinished = !1xx response
1065  const bool txnIngressFinished =
1066  txn->isDownstream() || !txn->extraResponseExpected();
1067  if (txnIngressFinished) {
1068  decrementTransactionCount(txn, true, false);
1069  }
1070  txn->onIngressEOM();
1071 
1072  // The codec knows, based on the semantics of whatever protocol it
1073  // supports, whether it's valid for any more ingress messages to arrive
1074  // after this one. For example, an HTTP/1.1 request containing
1075  // "Connection: close" indicates the end of the ingress, whereas a
1076  // SPDY session generally can handle more messages at any time.
1077  //
1078  // If the connection is not reusable, we close the read side of it
1079  // but not the write side. There are two reasons why more writes
1080  // may occur after this point:
1081  // * If there are previous writes buffered up in the pendingWrites_
1082  // queue, we need to attempt to complete them.
1083  // * The Handler associated with the transaction may want to
1084  // produce more egress data when the ingress message is fully
1085  // complete. (As a common example, an application that handles
1086  // form POSTs may not be able to even start generating a response
1087  // until it has received the full request body.)
1088  //
1089  // There may be additional checks that need to be performed that are
1090  // specific to requests or responses, so we call the subclass too.
1091  if (!codec_->isReusable() &&
1092  txnIngressFinished &&
1094  VLOG(4) << *this << " cannot reuse ingress";
1095  shutdownTransport(true, false);
1096  }
1097 }
1098 
1100  const HTTPException& error, bool newTxn) {
1101  DestructorGuard dg(this);
1102  // The codec detected an error in the ingress stream, possibly bad
1103  // syntax, a truncated message, or bad semantics in the frame. If reads
1104  // are paused, queue up the event; otherwise, process it now.
1105  VLOG(4) << "Error on " << *this << ", streamID=" << streamID
1106  << ", " << error;
1107 
1108  if (ingressError_) {
1109  return;
1110  }
1111  if (!codec_->supportsParallelRequests()) {
1112  // this error should only prevent us from reading/handling more errors
1113  // on serial streams
1114  ingressError_ = true;
1116  }
1117  if ((streamID == 0) && infoCallback_) {
1119  }
1120 
1121  if (!streamID) {
1122  ingressError_ = true;
1123  onSessionParseError(error);
1124  return;
1125  }
1126 
1127  HTTPTransaction* txn = findTransaction(streamID);
1128  if (!txn) {
1129  if (error.hasHttpStatusCode() && streamID != 0) {
1130  // If the error has an HTTP code, then parsing was fine, it just was
1131  // illegal in a higher level way
1132  txn = createTransaction(streamID, HTTPCodec::NoStream,
1134  if (infoCallback_) {
1135  infoCallback_->onRequestBegin(*this);
1136  }
1137  if (txn) {
1138  handleErrorDirectly(txn, error);
1139  }
1140  } else if (newTxn) {
1141  onNewTransactionParseError(streamID, error);
1142  } else {
1143  VLOG(4) << *this << " parse error with invalid transaction";
1144  invalidStream(streamID);
1145  }
1146  return;
1147  }
1148 
1149  if (!txn->getHandler() &&
1150  txn->getEgressState() == HTTPTransactionEgressSM::State::Start) {
1151  handleErrorDirectly(txn, error);
1152  return;
1153  }
1154 
1155  txn->onError(error);
1156  if (!codec_->isReusable() && transactions_.empty()) {
1157  VLOG(4) << *this << "shutdown from onError";
1159  shutdownTransport(true, true);
1160  }
1161 }
1162 
1164  ErrorCode code) {
1165  VLOG(4) << "stream abort on " << *this << ", streamID=" << streamID
1166  << ", code=" << getErrorCodeString(code);
1167  HTTPTransaction* txn = findTransaction(streamID);
1168  if (!txn) {
1169  VLOG(4) << *this << " abort for unrecognized transaction, streamID= "
1170  << streamID;
1171  return;
1172  }
1174  folly::to<std::string>("Stream aborted, streamID=",
1175  streamID, ", code=", getErrorCodeString(code)));
1177  ex.setCodecStatusCode(code);
1178  DestructorGuard dg(this);
1179  if (isDownstream() && !txn->getAssocTxnId() && code == ErrorCode::CANCEL) {
1180  // Cancelling the assoc txn cancels all push txns
1181  for (auto it = txn->getPushedTransactions().begin();
1182  it != txn->getPushedTransactions().end(); ) {
1183  auto pushTxn = findTransaction(*it);
1184  ++it;
1185  DCHECK(pushTxn != nullptr);
1186  pushTxn->onError(ex);
1187  }
1188  }
1189  auto exTxns = txn->getExTransactions();
1190  for (auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1191  auto exTxn = findTransaction(*it);
1192  if (exTxn) {
1193  exTxn->onError(ex);
1194  }
1195  }
1196  txn->onError(ex);
1197 }
1198 
1199 void HTTPSession::onGoaway(uint64_t lastGoodStreamID,
1200  ErrorCode code,
1201  std::unique_ptr<folly::IOBuf> debugData) {
1202  DestructorGuard g(this);
1203  VLOG(4) << "GOAWAY on " << *this << ", code=" << getErrorCodeString(code);
1204 
1206 
1207  // Drain active transactions and prevent new transactions
1208  drain();
1209 
1210  // We give the less-forceful onGoaway() first so that transactions have
1211  // a chance to do stat tracking before potentially getting a forceful
1212  // onError().
1214 
1215  // Abort transactions which have been initiated but not created
1216  // successfully at the remote end. Upstream transactions are created
1217  // with odd transaction IDs and downstream transactions with even IDs.
1218  vector<HTTPCodec::StreamID> ids;
1219  auto firstStream = HTTPCodec::NoStream;
1220 
1221  for (const auto& txn: transactions_) {
1222  auto streamID = txn.first;
1223  if (((bool)(streamID & 0x01) == isUpstream()) &&
1224  (streamID > lastGoodStreamID)) {
1225  if (firstStream == HTTPCodec::NoStream) {
1226  // transactions_ is a set so it should be sorted by stream id.
1227  // We will defer adding the firstStream to the id list until
1228  // we can determine whether we have a codec error code.
1229  firstStream = streamID;
1230  continue;
1231  }
1232 
1233  ids.push_back(streamID);
1234  }
1235  }
1236 
1237 
1238  if (firstStream != HTTPCodec::NoStream && code != ErrorCode::NO_ERROR) {
1239  // If we get a codec error, we will attempt to blame the first stream
1240  // by delivering a specific error to it and let the rest of the streams
1241  // get a normal unacknowledged stream error.
1243  string debugInfo = (debugData) ?
1244  folly::to<string>(" with debug info: ", (char*)debugData->data()) : "";
1246  folly::to<std::string>(getErrorString(err),
1247  " on transaction id: ", *firstStream,
1248  " with codec error: ", getErrorCodeString(code),
1249  debugInfo));
1250  ex.setProxygenError(err);
1251  errorOnTransactionId(*firstStream, std::move(ex));
1252  } else if (firstStream != HTTPCodec::NoStream) {
1253  ids.push_back(*firstStream);
1254  }
1255 
1257 }
1258 
1260  VLOG(4) << *this << " got ping request with id=" << uniqueID;
1261 
1262  TimePoint timestamp = getCurrentTime();
1263 
1264  // Insert the ping reply to the head of writeBuf_
1266  codec_->generatePingReply(pingBuf, uniqueID);
1267  size_t pingSize = pingBuf.chainLength();
1268  pingBuf.append(writeBuf_.move());
1269  writeBuf_.append(pingBuf.move());
1270 
1271  if (byteEventTracker_) {
1272  byteEventTracker_->addPingByteEvent(pingSize, timestamp, bytesScheduled_);
1273  }
1274 
1275  scheduleWrite();
1276 }
1277 
1279  VLOG(4) << *this << " got ping reply with id=" << uniqueID;
1280  if (infoCallback_) {
1282  }
1283 }
1284 
1286  uint32_t amount) {
1287  VLOG(4) << *this << " got window update on streamID=" << streamID << " for "
1288  << amount << " bytes.";
1289  HTTPTransaction* txn = findTransaction(streamID);
1290  if (!txn) {
1291  // We MUST be using SPDY/3+ if we got WINDOW_UPDATE. The spec says that -
1292  //
1293  // A sender should ignore all the WINDOW_UPDATE frames associated with the
1294  // stream after it send the last frame for the stream.
1295  //
1296  // TODO: Only ignore if this is from some past transaction
1297  return;
1298  }
1299  txn->onIngressWindowUpdate(amount);
1300 }
1301 
1303  DestructorGuard g(this);
1304  for (auto& setting : settings) {
1305  if (setting.id == SettingsId::INITIAL_WINDOW_SIZE) {
1306  onSetSendWindow(setting.value);
1307  } else if (setting.id == SettingsId::MAX_CONCURRENT_STREAMS) {
1308  onSetMaxInitiatedStreams(setting.value);
1309  } else if (setting.id == SettingsId::SETTINGS_HTTP_CERT_AUTH) {
1310  if (!(verifyCertAuthSetting(setting.value))) {
1311  return;
1312  }
1313  }
1314  }
1315  if (codec_->generateSettingsAck(writeBuf_) > 0) {
1316  scheduleWrite();
1317  }
1318  if (infoCallback_) {
1319  infoCallback_->onSettings(*this, settings);
1320  }
1321 }
1322 
1324  VLOG(4) << *this << " received settings ack";
1325  if (infoCallback_) {
1326  infoCallback_->onSettingsAck(*this);
1327  }
1328 }
1329 
1331  const HTTPMessage::HTTPPriority& pri) {
1332  if (!getHTTP2PrioritiesEnabled()) {
1333  return;
1334  }
1335  http2::PriorityUpdate h2Pri{std::get<0>(pri), std::get<1>(pri),
1336  std::get<2>(pri)};
1337  HTTPTransaction* txn = findTransaction(streamID);
1338  if (txn) {
1339  // existing txn, change pri
1340  txn->onPriorityUpdate(h2Pri);
1341  } else {
1342  // virtual node
1343  txnEgressQueue_.addOrUpdatePriorityNode(streamID, h2Pri);
1344  }
1345 }
1346 
1348  std::unique_ptr<IOBuf> authRequest) {
1349  DestructorGuard dg(this);
1350  VLOG(4) << "CERTIFICATE_REQUEST on" << *this << ", requestId=" << requestId;
1351 
1352  std::pair<uint16_t, std::unique_ptr<folly::IOBuf>> authenticator;
1353  auto fizzBase = getTransport()->getUnderlyingTransport<AsyncFizzBase>();
1354  if (fizzBase) {
1355  if (isUpstream()) {
1356  authenticator =
1357  secondAuthManager_->getAuthenticator(*fizzBase,
1359  requestId,
1360  std::move(authRequest));
1361  } else {
1362  authenticator =
1363  secondAuthManager_->getAuthenticator(*fizzBase,
1365  requestId,
1366  std::move(authRequest));
1367  }
1368  } else {
1369  VLOG(4) << "Underlying transport does not support secondary "
1370  "authentication.";
1371  return;
1372  }
1374  authenticator.first,
1375  std::move(authenticator.second)) > 0) {
1376  scheduleWrite();
1377  }
1378 }
1379 
1381  std::unique_ptr<IOBuf> authenticator) {
1382  DestructorGuard dg(this);
1383  VLOG(4) << "CERTIFICATE on" << *this << ", certId=" << certId;
1384 
1385  bool isValid = false;
1386  auto fizzBase = getTransport()->getUnderlyingTransport<AsyncFizzBase>();
1387  if (fizzBase) {
1388  if (isUpstream()) {
1389  isValid = secondAuthManager_->validateAuthenticator(
1390  *fizzBase,
1392  certId,
1393  std::move(authenticator));
1394  } else {
1395  isValid = secondAuthManager_->validateAuthenticator(
1396  *fizzBase,
1398  certId,
1399  std::move(authenticator));
1400  }
1401  } else {
1402  VLOG(4) << "Underlying transport does not support secondary "
1403  "authentication.";
1404  return;
1405  }
1406  if (isValid) {
1407  VLOG(4) << "Successfully validated the authenticator provided by the peer.";
1408  } else {
1409  VLOG(4) << "Failed to validate the authenticator provided by the peer";
1410  }
1411 }
1412 
1414  HTTPCodec::StreamID streamID, std::unique_ptr<HTTPCodec> codec,
1415  const std::string& protocolString) {
1416  CHECK_EQ(streamID, 1);
1417  HTTPTransaction* txn = findTransaction(streamID);
1418  CHECK(txn);
1419  // only HTTP1xCodec calls onNativeProtocolUpgrade
1420  CHECK(!codec_->supportsParallelRequests());
1421 
1422  // Reset to defaults
1425 
1426  // overwrite destination, delay current codec deletion until the end
1427  // of the event loop
1428  auto oldCodec = codec_.setDestination(std::move(codec));
1429  sock_->getEventBase()->runInLoop([oldCodec = std::move(oldCodec)] () {});
1430 
1431  onCodecChanged();
1432 
1433  setupCodec();
1434 
1435  // txn will be streamID=1, have to make a placeholder
1436  (void)codec_->createStream();
1437 
1438  // This can happen if flow control was not explicitly set, and it got the
1439  // HTTP1xCodec defaults. Reset to the new codec default
1444  }
1445 
1446  // trigger settings frame that would have gone out in startNow()
1448  if (settings) {
1451  }
1452  sendSettings();
1453  if (connFlowControl_) {
1456  scheduleWrite();
1457  }
1458 
1459  // Convert the transaction that contained the Upgrade header
1464 
1465  if (!transportInfo_.secure &&
1467  transportInfo_.appProtocol->empty())) {
1468  transportInfo_.appProtocol = std::make_shared<string>(
1469  protocolString);
1470  }
1471 
1472  return true;
1473 }
1474 
1476  VLOG(4) << *this << " got send window size adjustment. new=" << windowSize;
1478  windowSize);
1479 }
1480 
1482  VLOG(4) << *this << " got new maximum number of concurrent txns "
1483  << "we can initiate: " << maxTxns;
1484  const bool didSupport = supportsMoreTransactions();
1486  if (infoCallback_ && didSupport != supportsMoreTransactions()) {
1487  if (didSupport) {
1489  } else {
1491  }
1492  }
1493 }
1494 
1497  scheduleWrite();
1498  return size;
1499 }
1500 
1502  VLOG(4) << *this << " pausing streamID=" << txn->getID() <<
1503  ", liveTransactions_ was " << liveTransactions_;
1504  CHECK_GT(liveTransactions_, 0);
1506  auto exTxns = txn->getExTransactions();
1507  for (auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1508  auto exTxn = findTransaction(*it);
1509  if (exTxn) {
1510  exTxn->pauseIngress();
1511  }
1512  }
1513 
1514  if (liveTransactions_ == 0) {
1515  pauseReads();
1516  }
1517 }
1518 
1520  VLOG(4) << *this << " resuming streamID=" << txn->getID() <<
1521  ", liveTransactions_ was " << liveTransactions_;
1523  auto exTxns = txn->getExTransactions();
1524  for (auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1525  auto exTxn = findTransaction(*it);
1526  if (exTxn) {
1527  exTxn->resumeIngress();
1528  }
1529  }
1530 
1531  if (liveTransactions_ == 1) {
1532  resumeReads();
1533  }
1534 }
1535 
1536 void
1538  // A transaction has timed out. If the transaction does not have
1539  // a Handler yet, because we haven't yet received the full request
1540  // headers, we give it a DirectResponseHandler that generates an
1541  // error page.
1542  VLOG(3) << "Transaction timeout for streamID=" << txn->getID();
1543  if (!codec_->supportsParallelRequests()) {
1544  // this error should only prevent us from reading/handling more errors
1545  // on serial streams
1546  ingressError_ = true;
1547  }
1548 
1549  if (!txn->getHandler() &&
1550  txn->getEgressState() == HTTPTransactionEgressSM::State::Start) {
1551  VLOG(4) << *this << " Timed out receiving headers";
1552  if (infoCallback_) {
1554  }
1556  // This can only happen with HTTP/2 where the HEADERS frame is incomplete
1557  // and we time out waiting for the CONTINUATION. Abort the request.
1558  //
1559  // It would maybe be a little nicer to use the timeout handler for these
1560  // also.
1561  txn->sendAbort();
1562  return;
1563  }
1564 
1565  VLOG(4) << *this << " creating direct error handler";
1567  txn->setHandler(handler);
1568  }
1569 
1570  // Tell the transaction about the timeout. The transaction will
1571  // communicate the timeout to the handler, and the handler will
1572  // decide how to proceed.
1573  txn->onIngressTimeout();
1574 }
1575 
1577  const HTTPMessage& headers,
1579  bool includeEOM) noexcept {
1580  CHECK(started_);
1581  unique_ptr<IOBuf> goawayBuf;
1582  if (shouldShutdown()) {
1583  // For HTTP/1.1, add Connection: close
1584  // For SPDY, save the goaway for AFTER the request
1585  auto writeBuf = writeBuf_.move();
1586  drainImpl();
1587  goawayBuf = writeBuf_.move();
1589  }
1590  if (isUpstream() || (txn->isPushed() && headers.isRequest())) {
1591  // upstream picks priority
1592  if (getHTTP2PrioritiesEnabled()) {
1593  auto pri = getMessagePriority(&headers);
1594  txn->onPriorityUpdate(pri);
1595  }
1596  }
1597 
1598  const bool wasReusable = codec_->isReusable();
1599  const uint64_t oldOffset = sessionByteOffset();
1600  auto exAttributes = txn->getExAttributes();
1601  auto assocStream = txn->getAssocTxnId();
1602  if (exAttributes) {
1604  txn->getID(),
1605  headers,
1606  *exAttributes,
1607  includeEOM,
1608  size);
1609  } else if (headers.isRequest() && assocStream) {
1610  // Only PUSH_PROMISE (not push response) has an associated stream
1612  txn->getID(),
1613  headers,
1614  *assocStream,
1615  includeEOM,
1616  size);
1617  } else {
1619  txn->getID(),
1620  headers,
1621  includeEOM,
1622  size);
1623  }
1624  const uint64_t newOffset = sessionByteOffset();
1625 
1626  // for push response count towards the MAX_CONCURRENT_STREAMS limit
1627  if (isDownstream() && headers.isResponse() && txn->isPushed()) {
1629  }
1630 
1631  // For all upstream headers, addFirstHeaderByteEvent should be added
1632  // For all downstream, only response headers need addFirstHeaderByteEvent
1633  bool shouldAddFirstHeaderByteEvent = isUpstream() ||
1634  (isDownstream() && headers.isResponse());
1635  if (shouldAddFirstHeaderByteEvent && newOffset > oldOffset &&
1636  !txn->testAndSetFirstHeaderByteSent() && byteEventTracker_) {
1637  byteEventTracker_->addFirstHeaderByteEvent(newOffset, txn);
1638  }
1639 
1640  if (size) {
1641  VLOG(4) << *this << " sending headers, size=" << size->compressed
1642  << ", uncompressedSize=" << size->uncompressed;
1643  }
1644  if (goawayBuf) {
1645  VLOG(4) << *this << " moved GOAWAY to end of writeBuf";
1646  writeBuf_.append(std::move(goawayBuf));
1647  }
1648  if (includeEOM) {
1649  commonEom(txn, 0, true);
1650  }
1651  scheduleWrite();
1652  onHeadersSent(headers, wasReusable);
1653 }
1654 
1655 void
1657  HTTPTransaction* txn,
1658  size_t encodedSize,
1659  bool piggybacked) noexcept {
1661  byteEventTracker_.get(), txn, encodedSize, sessionByteOffset(),
1662  piggybacked);
1664 }
1665 
1666 size_t
1668  std::unique_ptr<folly::IOBuf> body,
1669  bool includeEOM,
1670  bool trackLastByteFlushed) noexcept {
1671  uint64_t offset = sessionByteOffset();
1672  size_t bodyLen = body ? body->computeChainDataLength(): 0;
1673  size_t encodedSize = codec_->generateBody(writeBuf_,
1674  txn->getID(),
1675  std::move(body),
1677  includeEOM);
1678  CHECK(inLoopCallback_);
1679  pendingWriteSizeDelta_ -= bodyLen;
1680  bodyBytesPerWriteBuf_ += bodyLen;
1681  if (encodedSize > 0 && !txn->testAndSetFirstByteSent() && byteEventTracker_) {
1682  byteEventTracker_->addFirstBodyByteEvent(offset, txn);
1683  }
1684 
1685  if (trackLastByteFlushed && encodedSize > 0 && byteEventTracker_) {
1686  byteEventTracker_->addTrackedByteEvent(txn, offset + encodedSize);
1687  }
1688 
1689  if (includeEOM) {
1690  VLOG(5) << *this << " sending EOM in body for streamID=" << txn->getID();
1691  commonEom(txn, encodedSize, true);
1692  }
1693  return encodedSize;
1694 }
1695 
1697  size_t length) noexcept {
1698  size_t encodedSize = codec_->generateChunkHeader(writeBuf_,
1699  txn->getID(),
1700  length);
1701  scheduleWrite();
1702  return encodedSize;
1703 }
1704 
1706  HTTPTransaction* txn) noexcept {
1707  size_t encodedSize = codec_->generateChunkTerminator(writeBuf_,
1708  txn->getID());
1709  scheduleWrite();
1710  return encodedSize;
1711 }
1712 
1713 void
1715  // If the semantics of the protocol don't permit more messages
1716  // to be read or sent on this connection, close the socket in one or
1717  // more directions.
1718  CHECK(!transactions_.empty());
1719 
1720  if (infoCallback_) {
1722  }
1723  auto oldStreamCount = getPipelineStreamCount();
1724  decrementTransactionCount(txn, false, true);
1725  if (withRST || ((!codec_->isReusable() || readsShutdown()) &&
1726  transactions_.size() == 1)) {
1727  // We should shutdown reads if we are closing with RST or we aren't
1728  // interested in any further messages (ie if we are a downstream session).
1729  // Upgraded sessions have independent ingress and egress, and the reads
1730  // need not be shutdown on egress finish.
1731  if (withRST) {
1732  // Let any queued writes complete, but send a RST when done.
1733  VLOG(4) << *this << " resetting egress after this message";
1736  shutdownTransport(true, true);
1737  } else {
1738  // the reason is already set (either not reusable or readshutdown).
1739 
1740  // Defer normal shutdowns until the end of the loop. This
1741  // handles an edge case with direct responses with Connection:
1742  // close served before ingress EOM. The remainder of the ingress
1743  // message may be in the parse loop, so give it a chance to
1744  // finish out and avoid a kErrorEOF
1745 
1746  // we can get here during shutdown, in that case do not schedule a
1747  // shutdown callback again
1748  if (!shutdownTransportCb_) {
1749  // Just for safety, the following bumps the refcount on this session
1750  // to keep it live until the loopCb runs
1752  sock_->getEventBase()->runInLoop(shutdownTransportCb_.get(), true);
1753  }
1754  }
1755  } else {
1757  txn->getSequenceNumber());
1758  }
1759 }
1760 
1762  const HTTPHeaders* trailers) noexcept {
1763 
1764  VLOG(4) << *this << " sending EOM for streamID=" << txn->getID()
1765  << " trailers=" << (trailers ? "yes" : "no");
1766 
1767  size_t encodedSize = 0;
1768  if (trailers) {
1769  encodedSize = codec_->generateTrailers(writeBuf_, txn->getID(), *trailers);
1770  }
1771 
1772  // Don't send EOM for HTTP2, when trailers sent.
1773  // sendTrailers already flagged end of stream.
1774  bool http2Trailers = trailers && isHTTP2CodecProtocol(codec_->getProtocol());
1775  if (!http2Trailers) {
1776  encodedSize += codec_->generateEOM(writeBuf_, txn->getID());
1777  }
1778 
1779  commonEom(txn, encodedSize, false);
1780  return encodedSize;
1781 }
1782 
1785  // Ask the codec to generate an abort indicator for the transaction.
1786  // Depending on the protocol, this may be a no-op.
1787  // Schedule a network write to send out whatever egress we might
1788  // have queued up.
1789  VLOG(4) << *this << " sending abort for streamID=" << txn->getID();
1790  // drain this transaction's writeBuf instead of flushing it
1791  // then enqueue the abort directly into the Session buffer,
1792  // hence with max priority.
1793  size_t encodedSize = codec_->generateRstStream(writeBuf_,
1794  txn->getID(),
1795  statusCode);
1796 
1797  if (!codec_->isReusable()) {
1798  // HTTP 1x codec does not support per stream abort so this will
1799  // render the codec not reusable
1801  }
1802 
1803  scheduleWrite();
1804 
1805  // If the codec wasn't able to write a L7 message for the abort, then
1806  // fall back to closing the transport with a TCP level RST
1807  onEgressMessageFinished(txn, !encodedSize);
1808  return encodedSize;
1809 }
1810 
1812  const http2::PriorityUpdate& pri) noexcept {
1813  return sendPriorityImpl(txn->getID(), pri);
1814 }
1815 
1817  std::unique_ptr<SecondaryAuthManager> secondAuthManager) {
1818  secondAuthManager_ = std::move(secondAuthManager);
1819 }
1820 
1822  return secondAuthManager_.get();
1823 }
1824 
1830  std::unique_ptr<folly::IOBuf> certificateRequestContext,
1831  std::vector<fizz::Extension> extensions) {
1832  // Check if both sending and receiving peer have advertised valid
1833  // SETTINGS_HTTP_CERT_AUTH setting. Otherwise, the frames for secondary
1834  // authentication should not be sent.
1835  auto ingressSettings = codec_->getIngressSettings();
1836  auto egressSettings = codec_->getEgressSettings();
1837  if (ingressSettings && egressSettings) {
1838  if (ingressSettings->getSetting(SettingsId::SETTINGS_HTTP_CERT_AUTH, 0) ==
1839  0 ||
1840  egressSettings->getSetting(SettingsId::SETTINGS_HTTP_CERT_AUTH, 0) ==
1841  0) {
1842  VLOG(4) << "Secondary certificate authentication is not supported.";
1843  return 0;
1844  }
1845  }
1846  auto authRequest = secondAuthManager_->createAuthRequest(
1847  std::move(certificateRequestContext), std::move(extensions));
1848  auto encodedSize = codec_->generateCertificateRequest(
1849  writeBuf_, authRequest.first, std::move(authRequest.second));
1850  if (encodedSize > 0) {
1851  scheduleWrite();
1852  } else {
1853  VLOG(4) << "Failed to generate CERTIFICATE_REQUEST frame.";
1854  }
1855  return encodedSize;
1856 }
1857 
1859  bool ingressEOM,
1860  bool egressEOM) {
1861  if ((isUpstream() && !txn->isPushed()) ||
1862  (isDownstream() && txn->isPushed())) {
1863  if (ingressEOM && txn->testAndClearActive()) {
1864  outgoingStreams_--;
1865  }
1866  } else {
1867  if (egressEOM && txn->testAndClearActive()) {
1868  incomingStreams_--;
1869  }
1870  }
1871 }
1872 
1873 // This is a kludgy function because it requires the caller to remember
1874 // the old value of pipelineStreamCount from before it calls
1875 // decrementTransactionCount. I'm trying to avoid yet more state in
1876 // HTTPSession. If decrementTransactionCount actually closed a stream
1877 // and there is still a pipelinable stream, then it was pipelining
1878 bool
1880  size_t oldStreamCount, uint32_t txnSeqn) {
1881  if (!codec_->supportsParallelRequests() && !transactions_.empty() &&
1882  getPipelineStreamCount() < oldStreamCount &&
1883  getPipelineStreamCount() == 1) {
1884  auto& nextTxn = transactions_.rbegin()->second;
1885  DCHECK_EQ(nextTxn.getSequenceNumber(), txnSeqn + 1);
1886  DCHECK(!nextTxn.isIngressComplete());
1887  DCHECK(nextTxn.isIngressPaused());
1888  VLOG(4) << "Resuming paused pipelined txn " << nextTxn;
1889  nextTxn.resumeIngress();
1890  return true;
1891  }
1892  return false;
1893 }
1894 
1895 void
1897  DestructorGuard guard(this);
1898  HTTPCodec::StreamID streamID = txn->getID();
1899  auto txnSeqn = txn->getSequenceNumber();
1900  auto it = transactions_.find(txn->getID());
1901  DCHECK(it != transactions_.end());
1902 
1903  if (txn->isIngressPaused()) {
1904  // Someone detached a transaction that was paused. Make the resumeIngress
1905  // call to keep liveTransactions_ in order
1906  VLOG(4) << *this << " detached paused transaction=" << streamID;
1907  resumeIngress(txn);
1908  }
1909 
1910  VLOG(4) << *this << " removing streamID=" << streamID <<
1911  ", liveTransactions was " << liveTransactions_;
1912  CHECK_GT(liveTransactions_, 0);
1913  liveTransactions_--;
1914 
1915  if (txn->isPushed()) {
1916  auto assocTxn = findTransaction(*txn->getAssocTxnId());
1917  if (assocTxn) {
1918  assocTxn->removePushedTransaction(streamID);
1919  }
1920  }
1921  if (txn->getControlStream()) {
1922  auto controlTxn = findTransaction(*txn->getControlStream());
1923  if (controlTxn) {
1924  controlTxn->removeExTransaction(streamID);
1925  }
1926  }
1927 
1928  auto oldStreamCount = getPipelineStreamCount();
1929  decrementTransactionCount(txn, true, true);
1930  transactions_.erase(it);
1931 
1932  if (transactions_.empty()) {
1934  if (infoCallback_) {
1936  }
1937  if (getConnectionManager()) {
1939  }
1940  } else {
1941  if (infoCallback_) {
1943  }
1944  }
1945 
1946  if (!readsShutdown()) {
1947  if (maybeResumePausedPipelinedTransaction(oldStreamCount, txnSeqn)) {
1948  return;
1949  } else {
1950  // this will resume reads if they were paused (eg: 0 HTTP transactions)
1951  resumeReads();
1952  }
1953  }
1954 
1955  if (liveTransactions_ == 0 && transactions_.empty() && !isScheduled()) {
1956  resetTimeout();
1957  }
1958 
1959  // It's possible that this is the last transaction in the session,
1960  // so check whether the conditions for shutdown are satisfied.
1961  if (transactions_.empty()) {
1962  if (shouldShutdown()) {
1963  writesDraining_ = true;
1964  }
1965  // Handle the case where we are draining writes but all remaining
1966  // transactions terminated with no egress.
1967  if (writesDraining_ && !writesShutdown() && !hasMoreWrites()) {
1968  shutdownTransport(false, true);
1969  return;
1970  }
1971  }
1972  checkForShutdown();
1973 }
1974 
1975 size_t
1977  uint32_t bytes) noexcept {
1978  size_t sent = codec_->generateWindowUpdate(writeBuf_, txn->getID(), bytes);
1979  if (sent) {
1980  scheduleWrite();
1981  }
1982  return sent;
1983 }
1984 
1985 void
1988  resumeReads();
1989  }
1990  if (connFlowControl_ &&
1992  scheduleWrite();
1993  }
1994 }
1995 
1996 void
1998  pendingWriteSizeDelta_ += bytes;
1999  // any net change requires us to update pause/resume state in the
2000  // loop callback
2001  if (pendingWriteSizeDelta_ > 0) {
2002  // pause inline, resume in loop
2003  updateWriteBufSize(0);
2004  } else if (!isLoopCallbackScheduled()) {
2005  sock_->getEventBase()->runInLoop(this);
2006  }
2007 }
2008 
2010  TransportInfo* tinfo) const {
2011  auto sock = sock_->getUnderlyingTransport<AsyncSocket>();
2012  if (sock) {
2013  tinfo->initWithSocket(sock);
2014  return true;
2015  }
2016  return false;
2017 }
2018 
2021  // some fields are the same with the setup transport info
2023  tinfo->secure = transportInfo_.secure;
2029  tinfo->sslError = transportInfo_.sslError;
2030 #if defined(__linux__) || defined(__FreeBSD__)
2031  // update connection transport info with the latest RTT
2032  if (tinfo->tcpinfo.tcpi_rtt > 0) {
2033  transportInfo_.tcpinfo.tcpi_rtt = tinfo->tcpinfo.tcpi_rtt;
2034  transportInfo_.rtt = std::chrono::microseconds(tinfo->tcpinfo.tcpi_rtt);
2035  }
2036  transportInfo_.rtx = tinfo->rtx;
2037 #endif
2038  return true;
2039  }
2040  return false;
2041 }
2042 
2043 unique_ptr<IOBuf> HTTPSession::getNextToSend(bool* cork, bool* eom) {
2044  // limit ourselves to one outstanding write at a time (onWriteSuccess calls
2045  // scheduleWrite)
2046  if (numActiveWrites_ > 0 || writesShutdown()) {
2047  VLOG(4) << "skipping write during this loop, numActiveWrites_=" <<
2048  numActiveWrites_ << " writesShutdown()=" << writesShutdown();
2049  return nullptr;
2050  }
2051 
2052  // We always tack on at least one body packet to the current write buf
2053  // This ensures that a short HTTPS response will go out in a single SSL record
2054  while (!txnEgressQueue_.empty()) {
2055  uint32_t toSend = kWriteReadyMax;
2056  if (connFlowControl_) {
2057  if (connFlowControl_->getAvailableSend() == 0) {
2058  VLOG(4) << "Session-level send window is full, skipping remaining "
2059  << "body writes this loop";
2060  break;
2061  }
2062  toSend = std::min(toSend, connFlowControl_->getAvailableSend());
2063  }
2066  CHECK(!nextEgressResults_.empty()); // Queue was non empty, so this must be
2067  // The maximum we will send for any transaction in this loop
2068  uint32_t txnMaxToSend = toSend * nextEgressResults_.front().second;
2069  if (txnMaxToSend == 0) {
2070  // toSend is smaller than the number of transactions. Give all egress
2071  // to the first transaction
2072  nextEgressResults_.erase(++nextEgressResults_.begin(),
2073  nextEgressResults_.end());
2074  txnMaxToSend = std::min(toSend, egressBodySizeLimit_);
2075  nextEgressResults_.front().second = 1;
2076  }
2077  if (nextEgressResults_.size() > 1 && txnMaxToSend > egressBodySizeLimit_) {
2078  // Cap the max to egressBodySizeLimit_, and recompute toSend accordingly
2079  txnMaxToSend = egressBodySizeLimit_;
2080  toSend = txnMaxToSend / nextEgressResults_.front().second;
2081  }
2082  // split allowed by relative weight, with some minimum
2083  for (auto txnPair: nextEgressResults_) {
2084  uint32_t txnAllowed = txnPair.second * toSend;
2085  if (nextEgressResults_.size() > 1) {
2086  CHECK_LE(txnAllowed, egressBodySizeLimit_);
2087  }
2088  if (connFlowControl_) {
2089  CHECK_LE(txnAllowed, connFlowControl_->getAvailableSend());
2090  }
2091  if (txnAllowed == 0) {
2092  // The ratio * toSend was so small this txn gets nothing.
2093  VLOG(4) << *this << " breaking egress loop on 0 txnAllowed";
2094  break;
2095  }
2096 
2097  VLOG(4) << *this << " egressing txnID=" << txnPair.first->getID() <<
2098  " allowed=" << txnAllowed;
2099  txnPair.first->onWriteReady(txnAllowed, txnPair.second);
2100  }
2101  nextEgressResults_.clear();
2102  // it can be empty because of HTTPTransaction rate limiting. We should
2103  // change rate limiting to clearPendingEgress while waiting.
2104  if (!writeBuf_.empty()) {
2105  break;
2106  }
2107  }
2108  *eom = false;
2109  if (byteEventTracker_) {
2110  uint64_t needed = byteEventTracker_->preSend(cork, eom, bytesWritten_);
2111  if (needed > 0) {
2112  VLOG(5) << *this << " writeBuf_.chainLength(): "
2113  << writeBuf_.chainLength() << " txnEgressQueue_.empty(): "
2114  << txnEgressQueue_.empty();
2115 
2116  if (needed < writeBuf_.chainLength()) {
2117  // split the next EOM chunk
2118  VLOG(5) << *this << " splitting " << needed << " bytes out of a "
2119  << writeBuf_.chainLength() << " bytes IOBuf";
2120  *cork = true;
2121  if (sessionStats_) {
2123  }
2124  return writeBuf_.split(needed);
2125  } else {
2126  CHECK_EQ(needed, writeBuf_.chainLength());
2127  }
2128  }
2129  }
2130 
2131  // cork if there are txns with pending egress and room to send them
2132  *cork = !txnEgressQueue_.empty() && !isConnWindowFull();
2133  return writeBuf_.move();
2134 }
2135 
2136 void
2138  // We schedule this callback to run at the end of an event
2139  // loop iteration if either of two conditions has happened:
2140  // * The session has generated some egress data (see scheduleWrite())
2141  // * Reads have become unpaused (see resumeReads())
2142  DestructorGuard dg(this);
2143  inLoopCallback_ = true;
2144  auto scopeg = folly::makeGuard([this] {
2145  inLoopCallback_ = false;
2146  // This ScopeGuard needs to be under the above DestructorGuard
2147  if (pendingWriteSizeDelta_) {
2148  updateWriteBufSize(0);
2149  }
2150  checkForShutdown();
2151  });
2152  VLOG(5) << *this << " in loop callback";
2153 
2154  for (uint32_t i = 0; i < kMaxWritesPerLoop; ++i) {
2156  if (isPrioritySampled()) {
2160  }
2161 
2162  bool cork = true;
2163  bool eom = false;
2164  unique_ptr<IOBuf> writeBuf = getNextToSend(&cork, &eom);
2165 
2166  if (!writeBuf) {
2167  break;
2168  }
2169  uint64_t len = writeBuf->computeChainDataLength();
2170  VLOG(11) << *this
2171  << " bytes of egress to be written: " << len
2172  << " cork:" << cork << " eom:" << eom;
2173  if (len == 0) {
2174  checkForShutdown();
2175  return;
2176  }
2177 
2178  if (isPrioritySampled()) {
2182  }
2183 
2184  WriteSegment* segment = new WriteSegment(this, len);
2185  segment->setCork(cork);
2186  segment->setEOR(eom);
2187 
2188  pendingWrites_.push_back(*segment);
2189  if (!writeTimeout_.isScheduled()) {
2190  // Any performance concern here?
2192  }
2193  numActiveWrites_++;
2194  VLOG(4) << *this << " writing " << len << ", activeWrites="
2195  << numActiveWrites_ << " cork=" << cork << " eom=" << eom;
2196  bytesScheduled_ += len;
2197  sock_->writeChain(segment, std::move(writeBuf), segment->getFlags());
2198  if (numActiveWrites_ > 0) {
2199  updateWriteCount();
2200  pendingWriteSizeDelta_ += len;
2201  // updateWriteBufSize called in scope guard
2202  break;
2203  }
2204  // writeChain can result in a writeError and trigger the shutdown code path
2205  }
2206  if (numActiveWrites_ == 0 && !writesShutdown() && hasMoreWrites() &&
2208  scheduleWrite();
2209  }
2210 
2211  if (readsUnpaused()) {
2212  processReadData();
2213 
2214  // Install the read callback if necessary
2215  if (readsUnpaused() && !sock_->getReadCallback()) {
2216  sock_->setReadCB(this);
2217  }
2218  }
2219  // checkForShutdown is now in ScopeGuard
2220 }
2221 
2222 void
2224  // Do all the network writes for this connection in one batch at
2225  // the end of the current event loop iteration. Writing in a
2226  // batch helps us packetize the network traffic more efficiently,
2227  // as well as saving a few system calls.
2228  if (!isLoopCallbackScheduled() &&
2229  (writeBuf_.front() || !txnEgressQueue_.empty())) {
2230  VLOG(5) << *this << " scheduling write callback";
2231  sock_->getEventBase()->runInLoop(this);
2232  }
2233 }
2234 
2235 void
2237  if (numActiveWrites_ > 0 && writesUnpaused()) {
2238  // Exceeded limit. Pause reading on the incoming stream.
2239  VLOG(3) << "Pausing egress for " << *this;
2240  writes_ = SocketState::PAUSED;
2241  } else if (numActiveWrites_ == 0 && writesPaused()) {
2242  // Dropped below limit. Resume reading on the incoming stream if needed.
2243  VLOG(3) << "Resuming egress for " << *this;
2244  writes_ = SocketState::UNPAUSED;
2245  }
2246 }
2247 
2248 void
2250  // This is the sum of body bytes buffered within transactions_ and in
2251  // the sock_'s write buffer.
2252  delta += pendingWriteSizeDelta_;
2254  bool wasExceeded = egressLimitExceeded();
2255  updatePendingWriteSize(delta);
2256 
2257  if (egressLimitExceeded() && !wasExceeded) {
2258  // Exceeded limit. Pause reading on the incoming stream.
2259  if (inResume_) {
2260  VLOG(3) << "Pausing txn egress for " << *this << " deferred";
2261  pendingPause_ = true;
2262  } else {
2263  VLOG(3) << "Pausing txn egress for " << *this;
2265  }
2266  } else if (!egressLimitExceeded() && wasExceeded) {
2267  // Dropped below limit. Resume reading on the incoming stream if needed.
2268  if (inResume_) {
2269  if (pendingPause_) {
2270  VLOG(3) << "Cancel deferred txn egress pause for " << *this;
2271  pendingPause_ = false;
2272  } else {
2273  VLOG(3) << "Ignoring redundant resume for " << *this;
2274  }
2275  } else {
2276  VLOG(3) << "Resuming txn egress for " << *this;
2278  }
2279  }
2280 }
2281 
2282 void
2284  bool shutdownWrites,
2285  const std::string& errorMsg) {
2286  DestructorGuard guard(this);
2287 
2288  // shutdowns not accounted for, shouldn't see any
2290 
2291  VLOG(4) << "shutdown request for " << *this << ": reads="
2292  << shutdownReads << " (currently " << readsShutdown()
2293  << "), writes=" << shutdownWrites << " (currently "
2294  << writesShutdown() << ")";
2295 
2296  bool notifyEgressShutdown = false;
2297  bool notifyIngressShutdown = false;
2298 
2300  if (!transportInfo_.sslError.empty()) {
2301  error = kErrorSSL;
2302  } else if (sock_->error()) {
2303  VLOG(3) << "shutdown request for " << *this
2304  << " on bad socket. Shutting down writes too.";
2306  error = kErrorWrite;
2307  } else {
2308  error = kErrorConnectionReset;
2309  }
2310  shutdownWrites = true;
2312  error = kErrorTimeout;
2313  } else {
2314  error = kErrorEOF;
2315  }
2316 
2317  if (shutdownReads && !shutdownWrites && flowControlTimeout_.isScheduled()) {
2318  // reads are dead and writes are blocked on a window update that will never
2319  // come. shutdown writes too.
2320  VLOG(4) << *this << " Converting read shutdown to read/write due to"
2321  " flow control";
2322  shutdownWrites = true;
2323  }
2324 
2325  if (shutdownWrites && !writesShutdown()) {
2329  scheduleWrite();
2330  }
2331  if (!hasMoreWrites() &&
2332  (transactions_.empty() || codec_->closeOnEgressComplete())) {
2334  if (byteEventTracker_) {
2335  byteEventTracker_->drainByteEvents();
2336  }
2338  VLOG(4) << *this << " writes drained, sending RST";
2339  resetSocketOnShutdown_ = true;
2340  shutdownReads = true;
2341  } else {
2342  VLOG(4) << *this << " writes drained, closing";
2343  sock_->shutdownWriteNow();
2344  }
2345  notifyEgressShutdown = true;
2346  } else if (!writesDraining_) {
2347  writesDraining_ = true;
2348  notifyEgressShutdown = true;
2349  } // else writes are already draining; don't double notify
2350  }
2351 
2352  if (shutdownReads && !readsShutdown()) {
2353  notifyIngressShutdown = true;
2354  // TODO: send an RST if readBuf_ is non empty?
2355  sock_->setReadCB(nullptr);
2357  if (!transactions_.empty() && error == kErrorConnectionReset) {
2358  if (infoCallback_ != nullptr) {
2359  infoCallback_->onIngressError(*this, error);
2360  }
2361  } else if (error == kErrorEOF) {
2362  // Report to the codec that the ingress stream has ended
2363  codec_->onIngressEOF();
2364  if (infoCallback_) {
2366  }
2367  }
2368  // Once reads are shutdown the parser should stop processing
2369  codec_->setParserPaused(true);
2370  }
2371 
2372  if (notifyIngressShutdown || notifyEgressShutdown) {
2373  auto dir = (notifyIngressShutdown && notifyEgressShutdown)
2375  : (notifyIngressShutdown ? HTTPException::Direction::INGRESS
2377  HTTPException ex(
2378  dir,
2379  folly::to<std::string>("Shutdown transport: ", getErrorString(error),
2380  errorMsg.empty() ? "" : " ", errorMsg, ", ",
2381  getPeerAddress().describe()));
2382  ex.setProxygenError(error);
2384  }
2385 
2386  // Close the socket only after the onError() callback on the txns
2387  // and handler has been detached.
2388  checkForShutdown();
2389 }
2390 
2392  ProxygenError errorCode,
2393  const std::string& errorMsg) {
2394  DestructorGuard guard(this);
2395  VLOG(4) << "shutdownTransportWithReset";
2396 
2397  if (!readsShutdown()) {
2398  sock_->setReadCB(nullptr);
2400  }
2401 
2402  if (!writesShutdown()) {
2405  while (!pendingWrites_.empty()) {
2406  pendingWrites_.front().detach();
2407  numActiveWrites_--;
2408  }
2409  VLOG(4) << *this << " cancel write timer";
2411  resetSocketOnShutdown_ = true;
2412  }
2413 
2414  errorOnAllTransactions(errorCode, errorMsg);
2415  // drainByteEvents() can call detach(txn), which can in turn call
2416  // shutdownTransport if we were already draining. To prevent double
2417  // calling onError() to the transactions, we call drainByteEvents()
2418  // after we've given the explicit error.
2419  if (byteEventTracker_) {
2420  byteEventTracker_->drainByteEvents();
2421  }
2422 
2423  // HTTPTransaction::onError could theoretically schedule more callbacks,
2424  // so do this last.
2425  if (isLoopCallbackScheduled()) {
2427  }
2428  // onError() callbacks or drainByteEvents() could result in txns detaching
2429  // due to CallbackGuards going out of scope. Close the socket only after
2430  // the txns are detached.
2431  checkForShutdown();
2432 }
2433 
2434 void
2436  VLOG(10) << *this << " checking for shutdown, readShutdown="
2437  << readsShutdown() << ", writesShutdown=" << writesShutdown()
2438  << ", transaction set empty=" << transactions_.empty();
2439 
2440  // Two conditions are required to destroy the HTTPSession:
2441  // * All writes have been finished.
2442  // * There are no transactions remaining on the session.
2443  if (writesShutdown() && transactions_.empty() &&
2445  VLOG(4) << "destroying " << *this;
2446  sock_->setReadCB(nullptr);
2447  auto asyncSocket = sock_->getUnderlyingTransport<folly::AsyncSocket>();
2448  if (asyncSocket) {
2449  asyncSocket->setBufferCallback(nullptr);
2450  }
2452  if (resetSocketOnShutdown_) {
2453  sock_->closeWithReset();
2454  } else {
2455  sock_->closeNow();
2456  }
2457  destroy();
2458  }
2459 }
2460 
2461 void
2463  if (!draining_) {
2464  VLOG(4) << *this << " draining";
2465  draining_ = true;
2467 
2468  if (allTransactionsStarted()) {
2469  drainImpl();
2470  }
2471  if (transactions_.empty() && isUpstream()) {
2472  // We don't do this for downstream since we need to wait for
2473  // inflight requests to arrive
2474  VLOG(4) << *this << " shutdown from drain";
2475  shutdownTransport(true, true);
2476  }
2477  } else {
2478  VLOG(4) << *this << " already draining";
2479  }
2480 }
2481 
2483  if (codec_->isReusable() || codec_->isWaitingToDrain()) {
2485  // For HTTP/2, if we haven't started yet then we cannot send a GOAWAY frame
2486  // since we haven't sent the initial SETTINGS frame. Defer sending that
2487  // GOAWAY until the initial SETTINGS is sent.
2488  if (started_) {
2492  scheduleWrite();
2493  }
2494  }
2495 }
2496 
2498  return draining_ &&
2501  isUpstream() ||
2502  !codec_->isReusable());
2503 }
2504 
2506  const size_t bytes = codec_->generatePingRequest(writeBuf_);
2507  if (bytes) {
2508  scheduleWrite();
2509  }
2510  return bytes;
2511 }
2512 
2514  if (!codec_->supportsParallelRequests()) {
2515  // For HTTP/1.1, don't call createStream()
2516  return 0;
2517  }
2518  auto id = codec_->createStream();
2519  sendPriority(id, pri);
2520  return id;
2521 }
2522 
2524  http2::PriorityUpdate pri) {
2525  auto res = sendPriorityImpl(id, pri);
2527  return res;
2528 }
2529 
2530 
2532  http2::PriorityUpdate pri) {
2533  CHECK_NE(id, 0);
2534  const size_t bytes = codec_->generatePriority(
2536  pri.exclusive,
2537  pri.weight));
2538  if (bytes) {
2539  scheduleWrite();
2540  }
2541  return bytes;
2542 }
2543 
2546  auto it = transactions_.find(streamID);
2547  if (it == transactions_.end()) {
2548  return nullptr;
2549  } else {
2550  return &it->second;
2551  }
2552 }
2553 
2557  const folly::Optional<HTTPCodec::StreamID>& assocStreamID,
2558  const folly::Optional<HTTPCodec::ExAttributes>& exAttributes,
2559  const http2::PriorityUpdate& priority) {
2560  if (!sock_->good() || transactions_.count(streamID)) {
2561  // Refuse to add a transaction on a closing session or if a
2562  // transaction of that ID already exists.
2563  return nullptr;
2564  }
2565 
2566  if (transactions_.empty()) {
2567  if (infoCallback_) {
2569  }
2570  if (getConnectionManager()) {
2572  }
2574  }
2575 
2576  auto matchPair = transactions_.emplace(
2577  std::piecewise_construct,
2578  std::forward_as_tuple(streamID),
2579  std::forward_as_tuple(
2582  sessionStats_,
2586  priority,
2587  assocStreamID,
2588  exAttributes
2589  ));
2590 
2591  CHECK(matchPair.second) << "Emplacement failed, despite earlier "
2592  "existence check.";
2593 
2594  HTTPTransaction* txn = &matchPair.first->second;
2595 
2596  if (isPrioritySampled()) {
2597  txn->setPrioritySampled(true /* sampled */);
2598  }
2599 
2600  if (getNumTxnServed() > 0) {
2601  auto stats = txn->getSessionStats();
2602  if (stats != nullptr) {
2603  stats->recordSessionReused();
2604  }
2605  }
2606 
2607  VLOG(5) << *this << " adding streamID=" << txn->getID()
2608  << ", liveTransactions_ was " << liveTransactions_;
2609 
2611  incrementSeqNo();
2613 
2614  if (isUpstream() && !txn->isPushed()) {
2616  // do not count towards MAX_CONCURRENT_STREAMS for PUSH_PROMISE
2617  } else if (!(isDownstream() && txn->isPushed())) {
2618  incomingStreams_++;
2619  }
2620 
2621  return txn;
2622 }
2623 
2624 void
2626  outgoingStreams_++;
2628 }
2629 
2630 void
2632  DestructorGuard dg(this);
2633  bytesWritten_ += bytesWritten;
2634  transportInfo_.totalBytes += bytesWritten;
2635  CHECK(writeTimeout_.isScheduled());
2636  if (pendingWrites_.empty()) {
2637  VLOG(10) << "Cancel write timer on last successful write";
2639  } else {
2640  VLOG(10) << "Refresh write timer on writeSuccess";
2642  }
2643 
2644  if (infoCallback_) {
2645  infoCallback_->onWrite(*this, bytesWritten);
2646  }
2647 
2648  VLOG(5) << "total bytesWritten_: " << bytesWritten_;
2649 
2650  // processByteEvents will return true if it has been replaced with another
2651  // tracker in the middle and needs to be re-run. Should happen at most
2652  // once. while with no body is intentional
2653  while (byteEventTracker_ &&
2654  byteEventTracker_->processByteEvents(
2655  byteEventTracker_, bytesWritten_)) {} // pass
2656 
2657  if ((!codec_->isReusable() || readsShutdown()) && (transactions_.empty())) {
2658  if (!codec_->isReusable()) {
2659  // Shouldn't happen unless there is a bug. This can only happen when
2660  // someone calls shutdownTransport, but did not specify a reason before.
2662  }
2663  VLOG(4) << *this << " shutdown from onWriteSuccess";
2664  shutdownTransport(true, true);
2665  }
2666  numActiveWrites_--;
2667  if (!inLoopCallback_) {
2668  updateWriteCount();
2669  // safe to resume here:
2670  updateWriteBufSize(-folly::to<int64_t>(bytesWritten));
2671  // PRIO_FIXME: this is done because of the corking business...
2672  // in the future we may want to have a pull model
2673  // whereby the socket asks us for a given amount of
2674  // data to send...
2675  if (numActiveWrites_ == 0 && hasMoreWrites()) {
2676  runLoopCallback();
2677  }
2678  }
2679  onWriteCompleted();
2680 
2682  VLOG(4) << "Egress limit reached, shutting down "
2683  "session (egressed " << bytesWritten_ << ", limit set to "
2684  << egressBytesLimit_ << ")";
2685  shutdownTransport(true, true);
2686  }
2687 }
2688 
2689 void
2690 HTTPSession::onWriteError(size_t bytesWritten,
2691  const AsyncSocketException& ex) {
2692  VLOG(4) << *this << " write error: " << ex.what();
2693  if (infoCallback_) {
2694  infoCallback_->onWrite(*this, bytesWritten);
2695  }
2696 
2697  auto sslEx = dynamic_cast<const folly::SSLException*>(&ex);
2698  // Save the SSL error, if there was one. It will be recorded later
2699  if (sslEx && sslEx->getSSLError() == folly::SSLError::SSL_ERROR) {
2700  transportInfo_.sslError = ex.what();
2701  }
2702 
2705 }
2706 
2707 void
2709  if (!writesDraining_) {
2710  return;
2711  }
2712 
2713  if (numActiveWrites_) {
2714  return;
2715  }
2716 
2717  // Don't shutdown if there might be more writes
2718  if (!pendingWrites_.empty()) {
2719  return;
2720  }
2721 
2722  // All finished draining writes, so shut down the egress
2723  shutdownTransport(false, true);
2724 }
2725 
2727  VLOG(4) << *this << " session layer parse error. Terminate the session.";
2728  if (error.hasCodecStatusCode()) {
2729  std::unique_ptr<folly::IOBuf> errorMsg =
2730  folly::IOBuf::copyBuffer(error.what());
2733  error.getCodecStatusCode(),
2735  std::move(errorMsg) : nullptr);
2736  scheduleWrite();
2737  }
2739  shutdownTransport(true, true);
2740 }
2741 
2743  const HTTPException& error) {
2744  VLOG(4) << *this << " parse error with new transaction";
2745  if (error.hasCodecStatusCode()) {
2747  scheduleWrite();
2748  }
2749  if (!codec_->isReusable()) {
2750  // HTTP 1x codec does not support per stream abort so this will
2751  // render the codec not reusable
2753  }
2754 }
2755 
2756 void
2758  // Make sure the parser is paused. Note that if reads are shutdown
2759  // before they are paused, we never make it past the if.
2760  codec_->setParserPaused(true);
2761  if (!readsUnpaused() ||
2763  !ingressLimitExceeded())) {
2764  return;
2765  }
2766  pauseReadsImpl();
2767 }
2768 
2770  VLOG(4) << *this << ": pausing reads";
2771  if (infoCallback_) {
2773  }
2774  cancelTimeout();
2775  sock_->setReadCB(nullptr);
2776  reads_ = SocketState::PAUSED;
2777 }
2778 
2779 void
2781  if (!readsPaused() ||
2783  ingressLimitExceeded())) {
2784  return;
2785  }
2786  resumeReadsImpl();
2787 }
2788 
2790  VLOG(4) << *this << ": resuming reads";
2791  resetTimeout();
2792  reads_ = SocketState::UNPAUSED;
2793  codec_->setParserPaused(false);
2794  if (!isLoopCallbackScheduled()) {
2795  sock_->getEventBase()->runInLoop(this);
2796  }
2797 }
2798 
2799 bool
2801  VLOG(10) << __PRETTY_FUNCTION__
2802  << " numActiveWrites_: " << numActiveWrites_
2803  << " pendingWrites_.empty(): " << pendingWrites_.empty()
2804  << " pendingWrites_.size(): " << pendingWrites_.size()
2805  << " txnEgressQueue_.empty(): " << txnEgressQueue_.empty();
2806 
2807  return (numActiveWrites_ != 0) ||
2808  !pendingWrites_.empty() || writeBuf_.front() ||
2810 }
2811 
2813  ProxygenError err,
2814  const std::string& errorMsg) {
2815  std::vector<HTTPCodec::StreamID> ids;
2816  for (const auto& txn: transactions_) {
2817  ids.push_back(txn.first);
2818  }
2819  errorOnTransactionIds(ids, err, errorMsg);
2820 }
2821 
2823  const std::vector<HTTPCodec::StreamID>& ids,
2824  ProxygenError err,
2825  const std::string& errorMsg) {
2826  std::string extraErrorMsg;
2827  if (!errorMsg.empty()) {
2828  extraErrorMsg = folly::to<std::string>(". ", errorMsg);
2829  }
2830 
2831  for (auto id: ids) {
2833  folly::to<std::string>(getErrorString(err),
2834  " on transaction id: ", id,
2835  extraErrorMsg));
2836  ex.setProxygenError(err);
2838  }
2839 }
2840 
2843  HTTPException ex) {
2844  auto txn = findTransaction(id);
2845  if (txn != nullptr) {
2846  txn->onError(std::move(ex));
2847  }
2848 }
2849 
2851  CHECK(!inResume_);
2852  inResume_ = true;
2853  DestructorGuard g(this);
2854  auto resumeFn = [] (HTTP2PriorityQueue&, HTTPCodec::StreamID,
2855  HTTPTransaction *txn, double) {
2856  if (txn) {
2857  txn->resumeEgress();
2858  }
2859  return false;
2860  };
2861  auto stopFn = [this] {
2862  return (transactions_.empty() || egressLimitExceeded());
2863  };
2864 
2865  txnEgressQueue_.iterateBFS(resumeFn, stopFn, true /* all */);
2866  inResume_ = false;
2867  if (pendingPause_) {
2868  VLOG(3) << "Pausing txn egress for " << *this;
2869  pendingPause_ = false;
2871  }
2872 }
2873 
2876  // We can write more now. Schedule a write.
2877  scheduleWrite();
2878 }
2879 
2881  if(!txnEgressQueue_.empty()) {
2882  VLOG(4) << *this << " session stalled by flow control";
2883  if (sessionStats_) {
2885  }
2886  }
2887  DCHECK(!flowControlTimeout_.isScheduled());
2888  if (infoCallback_) {
2890  }
2891  auto timeout = flowControlTimeout_.getTimeoutDuration();
2892  if (timeout != std::chrono::milliseconds(0)) {
2894  } else {
2896  }
2897 }
2898 
2900  if (!codec_->isReusable() || codec_->isWaitingToDrain()) {
2901  // TODO: just track last stream ID inside HTTPSession since this logic
2902  // is shared between HTTP/2 and SPDY
2903  return codec_->getLastIncomingStreamID();
2904  }
2905  VLOG(4) << *this << " getGracefulGoawayAck is reusable and not draining";
2906  // return the maximum possible stream id
2908 }
2909 
2911  if (!codec_->supportsParallelRequests()) {
2912  LOG(ERROR) << "Invalid stream on non-parallel codec.";
2913  return;
2914  }
2915 
2917  folly::to<std::string>("invalid stream=", stream));
2918  // TODO: Below line will change for HTTP/2 -- just call a const getter
2919  // function for the status code.
2920  err.setCodecStatusCode(code);
2921  onError(stream, err, true);
2922 }
2923 
2925  if (infoCallback_ && latency >= 0) {
2926  infoCallback_->onPingReplySent(latency);
2927  }
2928 }
2929 
2931  if (readsShutdown()) {
2932  shutdownTransport(true, transactions_.empty());
2933  }
2934 }
2935 
2937  if (infoCallback_) {
2939  }
2940 }
2941 
2943  if (infoCallback_) {
2945  }
2946 }
2947 
2949  CHECK(sock_);
2950  sock_->setReplaySafetyCallback(nullptr);
2951 
2952  if (infoCallback_) {
2954  }
2955 
2956  for (auto callback : waitingForReplaySafety_) {
2957  callback->onReplaySafe();
2958  }
2959  waitingForReplaySafety_.clear();
2960 }
2961 
2963  HTTPTransaction* txn, uint64_t eomOffset, bool eomTracked) noexcept {
2964  if (!sock_->isEorTrackingEnabled() || !eomTracked) {
2965  return;
2966  }
2967 
2968  if (eomOffset != sock_->getAppBytesWritten()) {
2969  VLOG(2) << "tracking ack to last app byte " << eomOffset
2970  << " while " << sock_->getAppBytesWritten()
2971  << " app bytes have already been written";
2972  return;
2973  }
2974 
2975  VLOG(5) << "tracking raw last byte " << sock_->getRawBytesWritten()
2976  << " while the app last byte is " << eomOffset;
2977 
2978  byteEventTracker_->addAckByteEvent(sock_->getRawBytesWritten(), txn);
2979 }
2980 
2981 
2982 
2983 } // proxygen
size_t sendAbort(HTTPTransaction *txn, ErrorCode statusCode) noexceptoverride
size_t sendBody(HTTPTransaction *txn, std::unique_ptr< folly::IOBuf >, bool includeEOM, bool trackLastByteFlushed) noexceptoverride
size_t getPipelineStreamCount() const
Definition: HTTPSession.h:835
bool getCurrentTransportInfo(wangle::TransportInfo *tinfo) override
#define FOLLY_SCOPED_TRACE_SECTION(arg,...)
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
bool readsUnpaused() const
Definition: HTTPSession.h:626
HTTPTransaction * createTransaction(HTTPCodec::StreamID streamID, const folly::Optional< HTTPCodec::StreamID > &assocStreamID, const folly::Optional< HTTPCodec::ExAttributes > &exAttributes, const http2::PriorityUpdate &priority=http2::DefaultPriority)
virtual const HTTPSettings * getIngressSettings() const
Definition: HTTPCodec.h:657
virtual void setHandler(Handler *handler)
void onCertificate(uint16_t certId, std::unique_ptr< folly::IOBuf > authenticator) override
void shutdownTransport(bool shutdownReads=true, bool shutdownWrites=true, const std::string &errorMsg="")
void onError(const HTTPException &error)
size_t sendSettings() override
virtual size_t onIngress(const folly::IOBuf &buf)=0
const folly::IOBuf * front() const
Definition: IOBufQueue.h:476
folly::Optional< HTTPCodec::StreamID > getAssocTxnId() const
bool egressLimitExceeded() const
virtual size_t generatePingReply(folly::IOBufQueue &, uint64_t)
Definition: HTTPCodec.h:584
static const folly::Optional< uint8_t > NoPadding
Definition: HTTPCodec.h:53
int64_t length_
Definition: JSONSchema.cpp:233
virtual void onActivateConnection(const HTTPSessionBase &)
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
bool onNativeProtocolUpgradeImpl(HTTPCodec::StreamID txn, std::unique_ptr< HTTPCodec > codec, const std::string &protocolString)
void invokeOnAllTransactions(void(HTTPTransaction::*fn)(Args1...), Args2 &&...args)
Definition: HTTPSession.h:584
virtual bool closeOnEgressComplete() const =0
void errorOnTransactionId(HTTPCodec::StreamID id, HTTPException ex)
virtual StreamID createStream()=0
folly::AsyncTransportWrapper * getTransport() override
Definition: HTTPSession.h:61
virtual HTTPSettings * getEgressSettings()
Definition: HTTPCodec.h:653
void setSessionStats(HTTPSessionStats *stats) override
size_t chainLength() const
Definition: IOBufQueue.h:492
spdy::GoawayStatusCode statusCode
Definition: SPDYCodec.cpp:110
virtual bool supportsSessionFlowControl() const
Definition: HTTPCodec.h:374
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
Definition: Types-inl.h:220
void onPingReply(uint64_t uniqueID) override
void onPingReplyLatency(int64_t latency) noexceptoverride
void onEgressBuffered() override
HTTPCodecFilterChain codec_
HTTPTransaction * findTransaction(HTTPCodec::StreamID streamID)
void onDeleteAckEvent() noexceptoverride
size_t receiveSessionWindowSize_
Definition: HTTPSession.h:976
bool isBusy() const override
void resumeIngress(HTTPTransaction *txn) noexceptoverride
virtual bool supportsPushTransactions() const =0
void onEgressMessageFinished(HTTPTransaction *txn, bool withRST=false)
void setReadBufferLimit(uint32_t limit)
virtual void onWrite(const HTTPSessionBase &, size_t)
virtual void generateHeader(folly::IOBufQueue &writeBuf, StreamID stream, const HTTPMessage &msg, bool eom=false, HTTPHeaderSize *size=nullptr)=0
LogLevel max
Definition: LogLevel.cpp:31
void writeSuccess() noexceptoverride
Definition: HTTPSession.cpp:84
const SocketAddress peerAddr
Definition: TestUtils.cpp:20
FlowControlFilter * connFlowControl_
Definition: HTTPSession.h:913
virtual void setReceiveWindow(uint32_t capacity)
virtual void onSettingsOutgoingStreamsNotFull(const HTTPSessionBase &)
void onNewTransactionParseError(HTTPCodec::StreamID streamID, const HTTPException &error)
void errorOnAllTransactions(ProxygenError err, const std::string &errorMsg)
void setReceiveWindowSize(folly::IOBufQueue &writeBuf, uint32_t capacity)
std::unique_ptr< folly::IOBuf > getNextToSend(bool *cork, bool *eom)
size_t getCodecSendWindowSize() const
bool hasCodecStatusCode() const
Definition: HTTPException.h:95
void sendHeaders(HTTPTransaction *txn, const HTTPMessage &headers, HTTPHeaderSize *size, bool includeEOM) noexceptoverride
HTTP2PriorityQueue::NextEgressResult nextEgressResults_
Definition: HTTPSession.h:964
void readErr(const folly::AsyncSocketException &) noexceptoverride
size_t sendCertificateRequest(std::unique_ptr< folly::IOBuf > certificateRequestContext, std::vector< fizz::Extension > extensions) override
WriteTimeout writeTimeout_
Definition: HTTPSession.h:681
virtual void onEgressBuffered(const HTTPSessionBase &)
void writeTimeoutExpired() noexcept
void onWriteError(size_t bytesWritten, const folly::AsyncSocketException &ex)
size_t receiveStreamWindowSize_
Definition: HTTPSession.h:975
void onPingRequest(uint64_t uniqueID) override
void onCertificateRequest(uint16_t requestId, std::unique_ptr< folly::IOBuf > authRequest) override
void dropConnection() override
static uint32_t maxReadBufferSize_
bool isBufferMovable() noexceptoverride
virtual size_t generateChunkTerminator(folly::IOBufQueue &writeBuf, StreamID stream)=0
std::enable_if< std::is_constructible< C >::value >::type addFilters()
Definition: FilterChain.h:301
virtual void recordSessionStalled() noexcept=0
void reset(bool useFlowControl, uint32_t receiveInitialWindowSize, uint32_t receiveStreamWindowSize, uint32_t sendInitialWindowSize)
bool empty() const
Definition: IOBufQueue.h:503
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
virtual void onIngressError(const HTTPSessionBase &, ProxygenError)
bool getCurrentTransportInfoWithoutUpdate(wangle::TransportInfo *tinfo) const override
std::chrono::milliseconds getTimeoutDuration() const
Definition: HTTPSession.h:1009
virtual HTTPTransaction::Handler * getTransactionTimeoutHandler(HTTPTransaction *txn)=0
bool readsPaused() const
Definition: HTTPSession.h:630
virtual std::chrono::milliseconds getGracefulShutdownTimeout() const
void flowControlTimeoutExpired() noexcept
std::chrono::milliseconds getDefaultTimeout() const
CodecFactory codec
void setNewTransactionPauseState(HTTPCodec::StreamID streamID)
bool writesPaused() const
Definition: HTTPSession.h:642
const uint8_t * data() const
Definition: IOBuf.h:499
void onConnectionSendWindowOpen() override
void setProxygenError(ProxygenError proxygenError)
Definition: Exception.h:46
virtual CodecProtocol getProtocol() const =0
size_t sendWindowUpdate(HTTPTransaction *txn, uint32_t bytes) noexceptoverride
folly::AsyncTransportWrapper::UniquePtr sock_
Definition: HTTPSession.h:697
#define FOLLY_NULLABLE
folly::IOBufQueue readBuf_
Definition: HTTPSession.h:687
void readTimeoutExpired() noexcept
void setMaxConcurrentIncomingStreams(uint32_t num) override
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
HTTPSessionStats * getSessionStats() const
virtual void generatePushPromise(folly::IOBufQueue &, StreamID, const HTTPMessage &, StreamID, bool, HTTPHeaderSize *)
Definition: HTTPCodec.h:492
virtual void setupOnHeadersComplete(HTTPTransaction *txn, HTTPMessage *msg)=0
HTTP2PriorityQueue txnEgressQueue_
Definition: HTTPSession.h:690
virtual void onCreate(const HTTPSessionBase &)
static http_parser_settings settings
Definition: test.c:1529
void setCodecStatusCode(ErrorCode statusCode)
Definition: HTTPException.h:98
uint32_t maxConcurrentIncomingStreams_
Definition: HTTPSession.h:928
void setByteEventTracker(std::shared_ptr< ByteEventTracker > byteEventTracker)
const folly::SocketAddress & getLocalAddress() const noexceptoverride
Definition: HTTPSession.h:122
std::string describe() const
uint8_t getPriority() const
Definition: HTTPMessage.h:590
void updateSessionBytesSheduled(uint64_t bytes)
void onNewOutgoingStream(uint32_t outgoingStreams)
requires E e noexcept(noexcept(s.error(std::move(e))))
void onIngressChunkHeader(size_t length)
SSLResumeEnum sslResume
void decrementTransactionCount(HTTPTransaction *txn, bool ingressEOM, bool egressEOM)
std::set< HTTPCodec::StreamID > getExTransactions() const
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
Definition: error.h:48
tuple make_tuple()
Definition: gtest-tuple.h:675
void startNow() override
virtual size_t generateBody(folly::IOBufQueue &writeBuf, StreamID stream, std::unique_ptr< folly::IOBuf > chain, folly::Optional< uint8_t > padding, bool eom)=0
virtual size_t generateRstStream(folly::IOBufQueue &writeBuf, StreamID stream, ErrorCode code)=0
void updatePendingWriteSize(int64_t delta)
void onSetSendWindow(uint32_t windowSize)
void onActivated(ManagedConnection &conn) override
uint32_t maxConcurrentOutgoingStreamsRemote_
Definition: HTTPSession.h:922
virtual size_t generateTrailers(folly::IOBufQueue &writeBuf, StreamID stream, const HTTPHeaders &trailers)=0
size_t sendPing() override
void scheduleTimeout(folly::HHWheelTimer::Callback *callback, std::chrono::milliseconds timeout)
static const uint32_t kMinReadSize
bool writesUnpaused() const
Definition: HTTPSession.h:638
bool isUpstream() const
FilterChain< T1, T2, FilterType, set_callback, TakeOwnership > & add(Args &&...args)
Definition: FilterChain.h:341
virtual void onFullHandshakeCompletion(const HTTPSessionBase &)
const folly::SocketAddress & getPeerAddress() const noexceptoverride
Definition: HTTPSession.h:126
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
HTTPSessionBase(const folly::SocketAddress &localAddr, const folly::SocketAddress &peerAddr, HTTPSessionController *controller, const wangle::TransportInfo &tinfo, InfoCallback *infoCallback, std::unique_ptr< HTTPCodec > codec)
virtual bool isWaitingToDrain() const =0
static void destroy()
const std::set< HTTPCodec::StreamID > & getPushedTransactions() const
void commonEom(HTTPTransaction *txn, size_t encodedSize, bool piggybacked) noexcept
bool maybeResumePausedPipelinedTransaction(size_t oldStreamCount, uint32_t txnSeqn)
WheelTimerInstance timeout_
Definition: HTTPSession.h:699
WriteSegmentList pendingWrites_
Definition: HTTPSession.h:908
std::unique_ptr< ShutdownTransportCallback > shutdownTransportCb_
Definition: HTTPSession.h:998
virtual void onIngressMessage(const HTTPSessionBase &, const HTTPMessage &)
void notifyIngressBodyProcessed(uint32_t bytes) noexceptoverride
void onSetMaxInitiatedStreams(uint32_t maxTxns)
void handler(int, siginfo_t *, void *)
void readEOF() noexceptoverride
void onIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
uint64_t bodyBytesPerWriteBuf_
Definition: HTTPSession.h:959
bool getHTTP2PrioritiesEnabled() const override
Definition: HTTPSession.h:114
ConnectionCloseReason getConnectionCloseReason() const
static void handleLastByteEvents(ByteEventTracker *byteEventTracker, HTTPTransaction *txn, size_t encodedSize, size_t byteOffset, bool piggybacked)
HTTPCodec::StreamID getGracefulGoawayAck() const
bool initWithSocket(const folly::AsyncSocket *sock)
void onGoaway(ErrorCode code)
bool isLoopCallbackScheduled() const
Definition: EventBase.h:160
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
bool onBodyImpl(std::unique_ptr< folly::IOBuf > chain, size_t length, uint16_t padding, HTTPTransaction *txn)
void onDeactivated(ManagedConnection &conn) override
void notifyPendingEgress() noexceptoverride
virtual void onIngressEOF()=0
void onSettingsAck() override
virtual const std::string & getUserAgent() const =0
std::chrono::milliseconds setupTime
HTTPSessionController * getController()
void setTimeoutDuration(std::chrono::milliseconds duration)
Definition: HTTPSession.h:1013
void writeBuf(const Buf &buf, folly::io::Appender &out)
virtual void onHeadersSent(const HTTPMessage &, bool)
Definition: HTTPSession.h:300
virtual size_t generatePriority(folly::IOBufQueue &, StreamID, const HTTPMessage::HTTPPriority &)
Definition: HTTPCodec.h:620
void describe(std::ostream &os) const override
virtual bool supportsStreamFlowControl() const
Definition: HTTPCodec.h:367
int64_t pendingWriteSizeDelta_
Definition: HTTPSession.h:954
LogLevel min
Definition: LogLevel.cpp:30
uint64_t getNumTxnServed() const
void onWindowUpdate(HTTPCodec::StreamID stream, uint32_t amount) override
static Options cacheChainLength()
Definition: IOBufQueue.h:83
void setCloseReason(ConnectionCloseReason reason)
void onIngressWindowUpdate(uint32_t amount)
virtual void onTransactionDetached(const HTTPSessionBase &)
virtual void onSettingsOutgoingStreamsFull(const HTTPSessionBase &)
virtual size_t generateCertificate(folly::IOBufQueue &, uint16_t, std::unique_ptr< folly::IOBuf >)
Definition: HTTPCodec.h:642
virtual void onSettingsAck(const HTTPSessionBase &)
bool onPushedTransaction(HTTPTransaction *txn)
void drain() override
void handleErrorDirectly(HTTPTransaction *txn, const HTTPException &error)
virtual void generateExHeader(folly::IOBufQueue &, StreamID, const HTTPMessage &, const HTTPCodec::ExAttributes &, bool, HTTPHeaderSize *)
Definition: HTTPCodec.h:499
void onSettings(const SettingsList &settings) override
size_t sendEOM(HTTPTransaction *txn, const HTTPHeaders *trailers) noexceptoverride
bool isConnWindowFull() const
Definition: HTTPSession.h:765
folly::IntrusiveListHook listHook
Definition: HTTPSession.h:893
void onMessageComplete(HTTPCodec::StreamID streamID, bool upgrade) override
void setFlowControl(size_t initialReceiveWindow, size_t receiveStreamWindowSize, size_t receiveSessionWindowSize) override
const T * getUnderlyingTransport() const
HTTPTransaction * newExTransaction(HTTPTransaction::Handler *handler, HTTPCodec::StreamID controlStream, bool unidirectional=false) noexceptoverride
std::size_t length() const
Definition: IOBuf.h:533
HTTPTransactionEgressSM::State getEgressState() const
const std::string & getCodecProtocolString(CodecProtocol proto)
virtual void recordTTLBAIOBSplitByEom() noexcept=0
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
http2::PriorityUpdate getMessagePriority(const HTTPMessage *msg)
void onHeadersComplete(HTTPCodec::StreamID streamID, std::unique_ptr< HTTPMessage > msg) override
ErrorCode getCodecStatusCode() const
const char * getErrorCodeString(ErrorCode error)
Definition: ErrorCode.cpp:18
void onConnectionSendWindowClosed() override
HTTPCodec::StreamID sendPriority(http2::PriorityUpdate pri) override
virtual size_t generateConnectionPreface(folly::IOBufQueue &)
Definition: HTTPCodec.h:475
virtual size_t generateSettingsAck(folly::IOBufQueue &)
Definition: HTTPCodec.h:600
virtual uint32_t getDefaultWindowSize() const
Definition: HTTPCodec.h:680
void errorOnTransactionIds(const std::vector< HTTPCodec::StreamID > &ids, ProxygenError err, const std::string &extraErrorMsg="")
void onAbort(HTTPCodec::StreamID streamID, ErrorCode code) override
virtual size_t generateChunkHeader(folly::IOBufQueue &writeBuf, StreamID stream, size_t length)=0
bool shouldShutdown() const
void setEgressSettings(const SettingsList &inSettings) override
void setCallback(T2 *cb) override
Definition: FilterChain.h:235
HTTPTransaction * newPushedTransaction(HTTPCodec::StreamID assocStreamId, HTTPTransaction::PushHandler *handler) noexceptoverride
void removePushedTransaction(HTTPCodec::StreamID pushStreamId)
virtual StreamID mapPriorityToDependency(uint8_t) const
Definition: HTTPCodec.h:706
std::chrono::microseconds rtt
Definition: TransportInfo.h:70
folly::Optional< HTTPPriority > getHTTP2Priority() const
Definition: HTTPMessage.h:594
bool readsShutdown() const
Definition: HTTPSession.h:634
const Handler * getHandler() const
void onChunkHeader(HTTPCodec::StreamID stream, size_t length) override
static const char *const value
Definition: Conv.cpp:50
virtual void recordSessionReused() noexcept=0
ConnectionManager * getConnectionManager()
uint32_t getSequenceNumber() const
bool writesShutdown() const
Definition: HTTPSession.h:646
uint32_t liveTransactions_
Definition: HTTPSession.h:695
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
virtual size_t generateEOM(folly::IOBufQueue &writeBuf, StreamID stream)=0
virtual void onRead(const HTTPSessionBase &, size_t)
bool ingressLimitExceeded() const
void writeErr(size_t bytesWritten, const folly::AsyncSocketException &) noexceptoverride
void runLoopCallback() noexceptoverride
void invalidStream(HTTPCodec::StreamID stream, ErrorCode code=ErrorCode::_SPDY_INVALID_STREAM)
virtual bool extraResponseExpected() const
folly::Optional< HTTPCodec::StreamID > getControlStream() const
const PriorityUpdate DefaultPriority
Definition: HTTP2Framer.cpp:21
std::unique_ptr< T1 > setDestination(std::unique_ptr< T1 > destination)
Definition: FilterChain.h:260
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
std::map< HTTPCodec::StreamID, HTTPTransaction > transactions_
Definition: HTTPSession.h:692
void readDataAvailable(size_t readSize) noexceptoverride
std::shared_ptr< std::string > sslCipher
void notifyPendingShutdown() override
size_t sendPriorityImpl(HTTPCodec::StreamID streamID, http2::PriorityUpdate pri)
std::tuple< uint32_t, bool, uint8_t > HTTPPriority
Definition: HTTPMessage.h:592
SteadyClock::time_point TimePoint
Definition: Time.h:25
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
Definition: ScopeGuard.h:184
void readBufferAvailable(std::unique_ptr< folly::IOBuf >) noexceptoverride
HTTPSessionStats * sessionStats_
virtual void onIngressPaused(const HTTPSessionBase &)
std::vector< HTTPSetting > SettingsList
Definition: HTTPSettings.h:81
virtual StreamID getLastIncomingStreamID() const
Definition: HTTPCodec.h:675
void notifyEgressBodyBuffered(int64_t bytes) noexceptoverride
folly::IOBufQueue writeBuf_
Definition: HTTPSession.h:684
std::list< ReplaySafetyCallback * > waitingForReplaySafety_
Definition: HTTPSession.h:846
void getReadBuffer(void **buf, size_t *bufSize) override
void onReplaySafe() noexceptoverride
void onBody(HTTPCodec::StreamID streamID, std::unique_ptr< folly::IOBuf > chain, uint16_t padding) override
bool isSpdyCodecProtocol(CodecProtocol protocol)
virtual bool allTransactionsStarted() const =0
size_t sendChunkHeader(HTTPTransaction *txn, size_t length) noexceptoverride
void setSecondAuthManager(std::unique_ptr< SecondaryAuthManager > secondAuthManager)
void onPriority(HTTPCodec::StreamID stream, const HTTPMessage::HTTPPriority &) override
const char * string
Definition: Conv.cpp:212
bool verifyCertAuthSetting(uint32_t value)
std::shared_ptr< ByteEventTracker > byteEventTracker_
Definition: HTTPSession.h:966
g_t g(f_t)
void onSessionParseError(const HTTPException &error)
void onGoaway(uint64_t lastGoodStreamID, ErrorCode code, std::unique_ptr< folly::IOBuf > debugData=nullptr) override
void setEgressBytesLimit(uint64_t bytesLimit)
void setPrioritySampled(bool sampled)
virtual void onRequestEnd(const HTTPSessionBase &, uint32_t)
virtual void onSettings(const HTTPSessionBase &, const SettingsList &)
void onIngressUpgrade(UpgradeProtocol protocol)
void onIngressSetSendWindow(uint32_t newWindowSize)
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
uint64_t egressBytesLimit_
Definition: HTTPSession.h:971
size_t sendChunkTerminator(HTTPTransaction *txn) noexceptoverride
const SocketAddress localAddr
Definition: TestUtils.cpp:19
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
WriteSegment(HTTPSession *session, uint64_t length)
Definition: HTTPSession.cpp:63
virtual bool isReusable() const =0
bool notifyBodyProcessed(uint32_t bytes)
void iterateBFS(const std::function< bool(HTTP2PriorityQueue &, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
void onEgressBufferCleared() override
void onMessageBegin(HTTPCodec::StreamID streamID, HTTPMessage *msg) override
void nextEgress(NextEgressResult &result, bool spdyMode=false)
uint32_t getAvailableSend() const
DrainTimeout drainTimeout_
Definition: HTTPSession.h:1033
void setBufferCallback(BufferCallback *cb)
void onChunkComplete(HTTPCodec::StreamID stream) override
static const folly::Optional< StreamID > NoStream
Definition: HTTPCodec.h:51
uint64_t StreamID
Definition: HTTPCodec.h:49
void addOrUpdatePriorityNode(HTTPCodec::StreamID id, http2::PriorityUpdate pri)
HTTPSession(const WheelTimerInstance &timeout, folly::AsyncTransportWrapper::UniquePtr sock, const folly::SocketAddress &localAddr, const folly::SocketAddress &peerAddr, HTTPSessionController *controller, std::unique_ptr< HTTPCodec > codec, const wangle::TransportInfo &tinfo, InfoCallback *infoCallback)
std::shared_ptr< std::string > appProtocol
virtual size_t generateWindowUpdate(folly::IOBufQueue &, StreamID, uint32_t)
Definition: HTTPCodec.h:610
void closeWhenIdle() override
bool isHTTP2CodecProtocol(CodecProtocol protocol)
void updateWriteBufSize(int64_t delta)
void transactionTimeout(HTTPTransaction *txn) noexceptoverride
virtual size_t generateSettings(folly::IOBufQueue &)
Definition: HTTPCodec.h:592
virtual size_t generateGoaway(folly::IOBufQueue &writeBuf, StreamID lastStream, ErrorCode code, std::unique_ptr< folly::IOBuf > debugData=nullptr)=0
virtual void onEgressBufferCleared(const HTTPSessionBase &)
FlowControlTimeout flowControlTimeout_
Definition: HTTPSession.h:1020
HTTPCodec::StreamID getID() const
virtual void onDeactivateConnection(const HTTPSessionBase &)
virtual void setSessionStats(HTTPSessionStats *stats)
bool ingressBytesProcessed(folly::IOBufQueue &writeBuf, uint32_t delta)
uint32_t getCertAuthSettingVal()
void detach(HTTPTransaction *txn) noexceptoverride
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
void dumpConnectionState(uint8_t loglevel) override
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
std::chrono::milliseconds sslSetupTime
SecondaryAuthManager * getSecondAuthManager() const
const char * what(void) const noexceptoverride
Definition: Exception.cpp:26
uint32_t streamID
Definition: SPDYCodec.cpp:131
virtual bool isBusy() const =0
void updateContentionsCount(uint64_t contentions)
virtual void setParserPaused(bool paused)=0
void onTrailersComplete(HTTPCodec::StreamID streamID, std::unique_ptr< HTTPHeaders > trailers) override
void removeExTransaction(HTTPCodec::StreamID exStreamId)
static uint32_t egressBodySizeLimit_
virtual size_t generatePingRequest(folly::IOBufQueue &)
Definition: HTTPCodec.h:576
void shutdownTransportWithReset(ProxygenError errorCode, const std::string &errorMsg="")
bool isDownstream() const
std::unique_ptr< SecondaryAuthManager > secondAuthManager_
Definition: HTTPSession.h:1055
void onPushMessageBegin(HTTPCodec::StreamID streamID, HTTPCodec::StreamID assocStreamID, HTTPMessage *msg) override
virtual bool supportsParallelRequests() const =0
void onError(HTTPCodec::StreamID streamID, const HTTPException &error, bool newTxn) override
virtual void onFlowControlWindowClosed(const HTTPSessionBase &)
bool supportsMoreTransactions() const
bool hasMoreWrites() const
void pauseIngress(HTTPTransaction *txn) noexceptoverride
virtual TransportDirection getTransportDirection() const =0
const HTTPSetting * getSetting(SettingsId id) const
wangle::TransportInfo transportInfo_
uint64_t sessionByteOffset()
Definition: HTTPSession.h:511
std::unique_ptr< folly::IOBuf > pop_front()
Definition: IOBufQueue.cpp:316
StringPiece label
folly::HHWheelTimer * getWheelTimer() const
virtual size_t generateCertificateRequest(folly::IOBufQueue &, uint16_t, std::unique_ptr< folly::IOBuf >)
Definition: HTTPCodec.h:631
void onWriteSuccess(uint64_t bytesWritten)
void onExMessageBegin(HTTPCodec::StreamID streamID, HTTPCodec::StreamID controlStream, bool unidirectional, HTTPMessage *msg) override
const char * getErrorString(ProxygenError error)
void setSetting(SettingsId id, SettingsValue val)
virtual void onRequestBegin(const HTTPSessionBase &)
static const folly::Optional< ExAttributes > NoExAttributes
Definition: HTTPCodec.h:66
void onLastByteEvent(HTTPTransaction *txn, uint64_t offset, bool eomTracked) noexceptoverride