proxygen
HTTPTransaction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 #include <algorithm>
13 #include <folly/Conv.h>
16 #include <glog/logging.h>
20 
21 using folly::IOBuf;
22 using std::unique_ptr;
23 
24 namespace proxygen {
25 
26 namespace {
27  const int64_t kApproximateMTU = 1400;
28  const std::chrono::seconds kRateLimitMaxDelay(10);
29 }
30 
33  uint32_t seqNo,
34  Transport& transport,
35  HTTP2PriorityQueueBase& egressQueue,
36  folly::HHWheelTimer* timer,
37  const
39  defaultTimeout,
40  HTTPSessionStats* stats,
41  bool useFlowControl,
42  uint32_t receiveInitialWindowSize,
43  uint32_t sendInitialWindowSize,
44  http2::PriorityUpdate priority,
47  exAttributes):
48  deferredEgressBody_(folly::IOBufQueue::cacheChainLength()),
49  direction_(direction),
50  id_(id),
51  seqNo_(seqNo),
52  transport_(transport),
53  stats_(stats),
54  recvWindow_(receiveInitialWindowSize),
55  sendWindow_(sendInitialWindowSize),
56  egressQueue_(egressQueue),
57  assocStreamId_(assocId),
58  priority_(priority),
59  ingressPaused_(false),
60  egressPaused_(false),
61  flowControlPaused_(false),
62  handlerEgressPaused_(false),
63  egressRateLimited_(false),
64  useFlowControl_(useFlowControl),
65  aborted_(false),
66  deleting_(false),
67  firstByteSent_(false),
68  firstHeaderByteSent_(false),
69  inResume_(false),
70  inActiveSet_(true),
71  ingressErrorSeen_(false),
72  priorityFallback_(false),
73  headRequest_(false),
74  enableLastByteFlushedTracking_(false),
75  transactionTimeout_(defaultTimeout),
76  timer_(timer) {
77 
78  if (assocStreamId_) {
79  if (isUpstream()) {
80  egressState_ = HTTPTransactionEgressSM::State::SendingDone;
81  } else {
82  ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
83  }
84  }
85 
86  if (exAttributes) {
87  exAttributes_ = exAttributes;
88  if (exAttributes_->unidirectional) {
89  if (isRemoteInitiated()) {
90  egressState_ = HTTPTransactionEgressSM::State::SendingDone;
91  } else {
92  ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
93  }
94  }
95  }
96 
98  if (stats_) {
100  }
101 
102  queueHandle_ = egressQueue_.addTransaction(id_, priority, this, false,
103  &insertDepth_);
104  if(priority.streamDependency != 0 && insertDepth_ == 1) {
105  priorityFallback_ = true;
106  }
107 
109 }
110 
113  || pendingByteEvents_ > 0 || deleting_) {
114  return;
115  }
116  VLOG(4) << "destroying transaction " << *this;
117  deleting_ = true;
118  if (handler_) {
120  handler_ = nullptr;
121  }
122  transportCallback_ = nullptr;
123  const auto bytesBuffered = recvWindow_.getOutstanding();
124  if (bytesBuffered) {
125  transport_.notifyIngressBodyProcessed(bytesBuffered);
126  }
127  transport_.detach(this);
128  (void)delayed; // prevent unused variable warnings
129 }
130 
132  // Cancel transaction timeout if still scheduled.
133  if (isScheduled()) {
134  cancelTimeout();
135  }
136 
137  if (stats_) {
139  }
140  if (isEnqueued()) {
141  dequeue();
142  }
143  // TODO: handle the case where the priority node hangs out longer than
144  // the transaction
146 }
147 
148 void HTTPTransaction::reset(bool useFlowControl,
149  uint32_t receiveInitialWindowSize,
150  uint32_t receiveStreamWindowSize,
151  uint32_t sendInitialWindowSize) {
152  useFlowControl_ = useFlowControl;
153  recvWindow_.setCapacity(receiveInitialWindowSize);
154  setReceiveWindow(receiveStreamWindowSize);
155  sendWindow_.setCapacity(sendInitialWindowSize);
156 }
157 
159  std::unique_ptr<HTTPMessage> msg) {
160  DestructorGuard g(this);
161  msg->setSeqNo(seqNo_);
162  if (isUpstream() && !isPushed() && msg->isResponse()) {
163  lastResponseStatus_ = msg->getStatusCode();
164  }
166  HTTPTransactionIngressSM::Event::onHeaders)) {
167  return;
168  }
169  if (msg->isRequest()) {
170  headRequest_ = (msg->getMethod() == HTTPMethod::HEAD);
171  }
172 
173  if ((msg->isRequest() && msg->getMethod() != HTTPMethod::CONNECT) ||
174  (msg->isResponse() && !headRequest_ &&
175  !RFC2616::responseBodyMustBeEmpty(msg->getStatusCode()))) {
176  // CONNECT payload has no defined semantics
177  const auto& contentLen =
178  msg->getHeaders().getSingleOrEmpty(HTTP_HEADER_CONTENT_LENGTH);
179  if (!contentLen.empty()) {
180  try {
181  expectedContentLengthRemaining_ = folly::to<uint64_t>(contentLen);
182  } catch (const folly::ConversionError& ex) {
183  LOG(ERROR) << "Invalid content-length: " << contentLen
184  << ", ex=" << ex.what() << *this;
185  }
186  }
187  }
188  if (transportCallback_) {
189  transportCallback_->headerBytesReceived(msg->getIngressHeaderSize());
190  }
193  }
194  if (mustQueueIngress()) {
197  std::move(msg));
198  VLOG(4) << "Queued ingress event of type "
199  << HTTPEvent::Type::HEADERS_COMPLETE << " " << *this;
200  } else {
202  }
203 }
204 
206  std::unique_ptr<HTTPMessage> msg) {
207  DestructorGuard g(this);
208  if (aborted_) {
209  return;
210  }
211  refreshTimeout();
212  if (handler_ && !isIngressComplete()) {
214  }
215 }
216 
217 void HTTPTransaction::onIngressBody(unique_ptr<IOBuf> chain,
218  uint16_t padding) {
219  FOLLY_SCOPED_TRACE_SECTION("HTTPTransaction - onIngressBody");
220  DestructorGuard g(this);
221  if (isIngressEOMSeen()) {
223  return;
224  }
225  auto len = chain->computeChainDataLength();
226  if (len == 0) {
227  return;
228  }
230  HTTPTransactionIngressSM::Event::onBody)) {
231  return;
232  }
234  if (expectedContentLengthRemaining_.value() >= len) {
237  } else {
238  auto errorMsg = folly::to<std::string>(
239  "Content-Length/body mismatch: received=",
240  len,
241  " expecting no more than ",
243  LOG(ERROR) << errorMsg << " " << *this;
244  if (handler_) {
247  onError(ex);
248  }
249  return;
250  }
251  }
252  if (transportCallback_) {
254  }
255  // register the bytes in the receive window
256  if (!recvWindow_.reserve(len + padding, useFlowControl_)) {
257  LOG(ERROR) << "recvWindow_.reserve failed with len=" << len
258  << " padding=" << padding
259  << " capacity=" << recvWindow_.getCapacity()
260  << " outstanding=" << recvWindow_.getOutstanding()
261  << " " << *this ;
263  return;
264  } else {
265  CHECK(recvWindow_.free(padding));
266  recvToAck_ += padding;
267  }
268  if (mustQueueIngress()) {
271  std::move(chain));
272  VLOG(4) << "Queued ingress event of type " << HTTPEvent::Type::BODY
273  << " size=" << len << " " << *this;
274  } else {
275  CHECK(recvWindow_.free(len));
276  processIngressBody(std::move(chain), len);
277  }
278 }
279 
280 void HTTPTransaction::processIngressBody(unique_ptr<IOBuf> chain, size_t len) {
281  FOLLY_SCOPED_TRACE_SECTION("HTTPTransaction - processIngressBody");
282  DestructorGuard g(this);
283  if (aborted_) {
284  return;
285  }
286  refreshTimeout();
288  if (handler_) {
289  if (!isIngressComplete()) {
290  handler_->onBody(std::move(chain));
291  }
292 
293  if (useFlowControl_ && !isIngressEOMSeen()) {
294  recvToAck_ += len;
295  if (recvToAck_ > 0) {
296  uint32_t divisor = 2;
297  if (transport_.isDraining()) {
298  // only send window updates for draining transports when window is
299  // closed
300  divisor = 1;
301  }
302  if (uint32_t(recvToAck_) >= (recvWindow_.getCapacity() / divisor)) {
304  }
305  }
306  } // else don't care about window updates
307  }
308 }
309 
312  HTTPTransactionIngressSM::Event::onChunkHeader)) {
313  return;
314  }
315  if (mustQueueIngress()) {
318  VLOG(4) << "Queued ingress event of type " << HTTPEvent::Type::CHUNK_HEADER
319  << " size=" << length << " " << *this;
320  } else {
322  }
323 }
324 
326  DestructorGuard g(this);
327  if (aborted_) {
328  return;
329  }
330  refreshTimeout();
331  if (handler_ && !isIngressComplete()) {
332  handler_->onChunkHeader(length);
333  }
334 }
335 
338  HTTPTransactionIngressSM::Event::onChunkComplete)) {
339  return;
340  }
341  if (mustQueueIngress()) {
344  VLOG(4) << "Queued ingress event of type "
345  << HTTPEvent::Type::CHUNK_COMPLETE << " " << *this;
346  } else {
348  }
349 }
350 
352  DestructorGuard g(this);
353  if (aborted_) {
354  return;
355  }
356  refreshTimeout();
357  if (handler_ && !isIngressComplete()) {
359  }
360 }
361 
362 void HTTPTransaction::onIngressTrailers(unique_ptr<HTTPHeaders> trailers) {
364  HTTPTransactionIngressSM::Event::onTrailers)) {
365  return;
366  }
367  if (mustQueueIngress()) {
370  std::move(trailers));
371  VLOG(4) << "Queued ingress event of type "
372  << HTTPEvent::Type::TRAILERS_COMPLETE << " " << *this;
373  } else {
375  }
376 }
377 
378 void HTTPTransaction::processIngressTrailers(unique_ptr<HTTPHeaders> trailers) {
379  DestructorGuard g(this);
380  if (aborted_) {
381  return;
382  }
383  refreshTimeout();
384  if (handler_ && !isIngressComplete()) {
385  handler_->onTrailers(std::move(trailers));
386  }
387 }
388 
391  HTTPTransactionIngressSM::Event::onUpgrade)) {
392  return;
393  }
394  if (mustQueueIngress()) {
396  deferredIngress_->emplace(id_, HTTPEvent::Type::UPGRADE, protocol);
397  VLOG(4) << "Queued ingress event of type " << HTTPEvent::Type::UPGRADE
398  << " " << *this;
399  } else {
400  processIngressUpgrade(protocol);
401  }
402 }
403 
405  DestructorGuard g(this);
406  if (aborted_) {
407  return;
408  }
409  if (handler_ && !isIngressComplete()) {
410  handler_->onUpgrade(protocol);
411  }
412 }
413 
415  if (isIngressEOMSeen()) {
416  // This can happen when HTTPSession calls onIngressEOF()
418  return;
419  }
422  auto errorMsg = folly::to<std::string>(
423  "Content-Length/body mismatch: expecting another ",
425  LOG(ERROR) << errorMsg << " " << *this;
426  if (handler_) {
429  onError(ex);
430  }
431  return;
432  }
433 
434  // TODO: change the codec to not give an EOM callback after a 100 response?
435  // We could then delete the below 'if'
436  if (isUpstream() && extraResponseExpected()) {
437  VLOG(4) << "Ignoring EOM on initial 100 response on " << *this;
438  return;
439  }
441  HTTPTransactionIngressSM::Event::onEOM)) {
442  return;
443  }
444  // We need to update the read timeout here. We're not likely to be
445  // expecting any more ingress, and the timer should be cancelled
446  // immediately. If we are expecting more, this will reset the timer.
448  if (mustQueueIngress()) {
451  VLOG(4) << "Queued ingress event of type "
452  << HTTPEvent::Type::MESSAGE_COMPLETE << " " << *this;
453  } else {
455  }
456 }
457 
459  DestructorGuard g(this);
460  if (aborted_) {
461  return;
462  }
463  VLOG(4) << "ingress EOM on " << *this;
464  const bool wasComplete = isIngressComplete();
466  HTTPTransactionIngressSM::Event::eomFlushed)) {
467  return;
468  }
469  if (handler_) {
470  if (!wasComplete) {
471  handler_->onEOM();
472  }
473  } else {
475  }
477 }
478 
480  return (useFlowControl_ && sendWindow_.getSize() <= 0);
481 }
482 
484  return (!ingressPaused_ &&
486 }
487 
489  if (isExpectingIngress()) {
490  refreshTimeout();
491  } else {
492  cancelTimeout();
493  }
494 }
495 
497  VLOG(4) << "Marking ingress complete on " << *this;
498  ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
499  deferredIngress_.reset();
500  cancelTimeout();
501 }
502 
504  VLOG(4) << "Marking egress complete on " << *this;
506  int64_t deferredEgressBodyBytes =
507  folly::to<int64_t>(deferredEgressBody_.chainLength());
508  transport_.notifyEgressBodyBuffered(-deferredEgressBodyBytes);
509  }
511  if (isEnqueued()) {
512  dequeue();
513  }
514  egressState_ = HTTPTransactionEgressSM::State::SendingDone;
515 }
516 
519  DestructorGuard g(this);
520 
522  std::stringstream ss;
523  ss << "Invalid ingress state transition, state=" << ingressState_ <<
524  ", event=" << event << ", streamID=" << id_;
527  ex.setCodecStatusCode(ErrorCode::INTERNAL_ERROR);
528  // This will invoke sendAbort() and also inform the handler of the
529  // error and detach the handler.
530  onError(ex);
531  return false;
532  }
533  return true;
534 }
535 
537  DestructorGuard g(this);
538 
539  const bool wasAborted = aborted_; // see comment below
540  const bool wasEgressComplete = isEgressComplete();
541  const bool wasIngressComplete = isIngressComplete();
542  bool notify = (handler_);
543  HTTPException::Direction direction = error.getDirection();
544 
545  if (direction == HTTPException::Direction::INGRESS &&
547  // we got an ingress error, we've seen the entire message, but we're
548  // expecting more (window updates). These aren't coming, convert to
549  // INGRESS_AND_EGRESS
550  VLOG(4) << "Converting ingress error to ingress+egress due to"
551  " flow control, and aborting " << *this;
554  }
555 
556  if (error.getProxygenError() == kErrorStreamAbort) {
557  DCHECK(error.getDirection() ==
559  aborted_ = true;
560  } else if (error.hasCodecStatusCode()) {
561  DCHECK(error.getDirection() ==
563  sendAbort(error.getCodecStatusCode());
564  }
565 
566  switch (direction) {
570  if (wasEgressComplete && wasIngressComplete &&
571  // We mark egress complete before we get acknowledgement of the
572  // write segment finishing successfully.
573  // TODO: instead of using DestructorGuard hacks to keep txn around,
574  // use an explicit callback function and set egress complete after
575  // last byte flushes (or egress error occurs), see #3912823
576  (error.getProxygenError() != kErrorWriteTimeout || wasAborted)) {
577  notify = false;
578  }
579  break;
582  if (!wasEgressComplete && isIngressEOMSeen() && ingressErrorSeen_) {
583  // we've already seen an ingress error but we ignored it, hoping the
584  // handler would resume and read our queued EOM. Now both sides are
585  // dead and we need to kill this transaction.
587  }
588  if (wasEgressComplete) {
589  notify = false;
590  }
591  break;
593  if (isIngressEOMSeen()) {
594  // Not an error, for now
595  ingressErrorSeen_ = true;
596  return;
597  }
599  if (wasIngressComplete) {
600  notify = false;
601  }
602  break;
603  }
604  if (notify && handler_) {
605  // mark egress complete may result in handler detaching
606  handler_->onError(error);
607  }
608 }
609 
611  DestructorGuard g(this);
612  VLOG(4) << "received GOAWAY notification on " << *this;
613  // This callback can be received at any time and does not affect this
614  // transaction's ingress or egress state machines. If it would have
615  // affected this transaction's state, we would have received onError()
616  // instead.
617  if (handler_) {
618  handler_->onGoaway(code);
619  }
620 }
621 
623  DestructorGuard g(this);
624  VLOG(4) << "ingress timeout on " << *this;
625  pauseIngress();
626  bool windowUpdateTimeout = !isEgressComplete() && isExpectingWindowUpdate();
627  if (handler_) {
628  if (windowUpdateTimeout) {
630  folly::to<std::string>("ingress timeout, streamID=", id_));
632  // This is a protocol error
634  onError(ex);
635  } else {
637  folly::to<std::string>("ingress timeout, streamID=", id_));
639  onError(ex);
640  }
641  } else {
644  }
645 }
646 
648  if (!useFlowControl_) {
649  return;
650  }
651  DestructorGuard g(this);
652  VLOG(4) << "Remote side ack'd " << amount << " bytes " << *this ;
654  if (sendWindow_.free(amount)) {
656  } else {
657  LOG(ERROR) << "sendWindow_.free failed with amount=" << amount
658  << " capacity=" << sendWindow_.getCapacity()
659  << " outstanding=" << sendWindow_.getOutstanding()
660  << " " << *this;
662  }
663 }
664 
666  if (!useFlowControl_) {
667  return;
668  }
670  if (sendWindow_.setCapacity(newWindowSize)) {
672  } else {
673  LOG(ERROR) << "sendWindow_.setCapacity failed with newWindowSize="
674  << newWindowSize << " capacity=" << sendWindow_.getCapacity()
675  << " outstanding=" << sendWindow_.getOutstanding()
676  << " " << *this;
678  }
679 }
680 
682  DestructorGuard g(this);
683  VLOG(4) << "egress timeout on " << *this;
684  if (handler_) {
686  folly::to<std::string>("egress timeout, streamID=", id_));
688  onError(ex);
689  } else {
691  }
692 }
693 
695  DestructorGuard g(this);
696  if (transportCallback_) {
698  }
699 }
700 
702  DestructorGuard g(this);
703  if (transportCallback_) {
705  }
706 }
707 
709  DestructorGuard g(this);
710  if (transportCallback_) {
712  }
713 }
714 
716  DestructorGuard g(this);
717  if (transportCallback_) {
719  }
720 }
721 
722 void HTTPTransaction::onEgressLastByteAck(std::chrono::milliseconds latency) {
723  DestructorGuard g(this);
724  if (transportCallback_) {
726  }
727 }
728 
730  const HTTPMessage& headers,
731  bool eom) {
733  egressState_, HTTPTransactionEgressSM::Event::sendHeaders));
734  DCHECK(!isEgressComplete());
735  if (!headers.isRequest() && !isPushed()) {
737  }
738  if (headers.isRequest()) {
739  headRequest_ = (headers.getMethod() == HTTPMethod::HEAD);
740  }
741 
742  if (headers.isResponse() && !headRequest_) {
743  const auto& contentLen =
745  if (!contentLen.empty()) {
746  try {
747  expectedResponseLength_ = folly::to<uint64_t>(contentLen);
748  } catch (const folly::ConversionError& ex) {
749  LOG(ERROR) << "Invalid content-length: " << contentLen <<
750  ", ex=" << ex.what() << *this;
751  }
752  }
753  }
755  transport_.sendHeaders(this, headers, &size, eom);
756  if (transportCallback_) {
758  }
761  }
762  if (eom) {
764  egressState_, HTTPTransactionEgressSM::Event::sendEOM));
765  // trailers are supported in this case:
766  // trailers are for chunked encoding-transfer of a body
767  if (transportCallback_) {
769  }
771  egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
772  }
774 }
775 
777  sendHeadersWithOptionalEOM(header, true);
778 }
779 
781  sendHeadersWithOptionalEOM(header, false);
782 }
783 
784 void HTTPTransaction::sendBody(std::unique_ptr<folly::IOBuf> body) {
785  DestructorGuard guard(this);
787  egressState_, HTTPTransactionEgressSM::Event::sendBody));
788 
789  if (body) {
790  size_t bodyLen = body->computeChainDataLength();
792 
793  if (isEnqueued()) {
795  }
796  }
799 }
800 
801 bool HTTPTransaction::onWriteReady(const uint32_t maxEgress, double ratio) {
802  DestructorGuard g(this);
803  DCHECK(isEnqueued());
804  if (prioritySample_) {
805  updateRelativeWeight(ratio);
806  }
807  cumulativeRatio_ += ratio;
808  egressCalls_++;
809  sendDeferredBody(maxEgress);
810  return isEnqueued();
811 }
812 
813 // Send up to maxEgress body bytes, including pendingEOM if appropriate
815  const int32_t windowAvailable = sendWindow_.getSize();
816  const uint32_t sendWindow = useFlowControl_ ? std::min<uint32_t>(
817  maxEgress, windowAvailable > 0 ? windowAvailable : 0) : maxEgress;
818 
819  // We shouldn't be called if we have no pending body/EOM, egress is paused, or
820  // the send window is closed
821  CHECK((deferredEgressBody_.chainLength() > 0 ||
822  isEgressEOMQueued()) &&
823  sendWindow > 0);
824 
825  const size_t bytesLeft = deferredEgressBody_.chainLength();
826 
827  size_t canSend = std::min<size_t>(sendWindow, bytesLeft);
828 
829  if (maybeDelayForRateLimit()) {
830  // Timeout will call notifyTransportPendingEgress again
831  return 0;
832  }
833 
834  size_t curLen = 0;
835  size_t nbytes = 0;
836  bool willSendEOM = false;
837 
838  if (chunkHeaders_.empty()) {
839  curLen = canSend;
840  std::unique_ptr<IOBuf> body = deferredEgressBody_.split(curLen);
841  willSendEOM = hasPendingEOM();
842  DCHECK(curLen > 0 || willSendEOM);
843  if (curLen > 0) {
844  if (willSendEOM) {
845  // we have to dequeue BEFORE sending the EOM =(
846  dequeue();
847  }
848  nbytes = sendBodyNow(std::move(body), curLen, willSendEOM);
849  willSendEOM = false;
850  } // else we got called with only a pending EOM, handled below
851  } else {
852  // This body is expliticly chunked
853  while (!chunkHeaders_.empty() && canSend > 0) {
854  Chunk& chunk = chunkHeaders_.front();
855  if (!chunk.headerSent) {
856  nbytes += transport_.sendChunkHeader(this, chunk.length);
857  chunk.headerSent = true;
858  }
859  curLen = std::min<size_t>(chunk.length, canSend);
860  std::unique_ptr<folly::IOBuf> cur = deferredEgressBody_.split(curLen);
861  VLOG(4) << "sending " << curLen << " fin=false";
862  nbytes += sendBodyNow(std::move(cur), curLen, false);
863  canSend -= curLen;
864  chunk.length -= curLen;
865  if (chunk.length == 0) {
866  nbytes += transport_.sendChunkTerminator(this);
867  chunkHeaders_.pop_front();
868  } else {
869  DCHECK_EQ(canSend, 0);
870  }
871  }
872  willSendEOM = hasPendingEOM();
873  }
874  // Send any queued eom
875  if (willSendEOM) {
876  nbytes += sendEOMNow();
877  }
878 
879  // Update the handler's pause state
881 
882  if (transportCallback_) {
884  }
885  return nbytes;
886 }
887 
889  if (egressLimitBytesPerMs_ <= 0) {
890  // No rate limiting
891  return false;
892  }
893 
894  if (numLimitedBytesEgressed_ == 0) {
895  // If we haven't egressed any bytes yet, don't delay.
896  return false;
897  }
898 
899  int64_t limitedDurationMs = (int64_t) millisecondsBetween(
900  getCurrentTime(),
902  ).count();
903 
904  // Algebra! Try to figure out the next time send where we'll
905  // be allowed to send at least 1 full packet's worth. The
906  // formula we're using is:
907  // (bytesSoFar + packetSize) / (timeSoFar + delay) == targetRateLimit
908  std::chrono::milliseconds requiredDelay(
909  (
910  ((int64_t)numLimitedBytesEgressed_ + kApproximateMTU) -
911  ((int64_t)egressLimitBytesPerMs_ * limitedDurationMs)
913  );
914 
915  if (requiredDelay.count() <= 0) {
916  // No delay required
917  return false;
918  }
919 
920  if (requiredDelay > kRateLimitMaxDelay) {
921  // The delay should never be this long
922  VLOG(4) << "ratelim: Required delay too long (" << requiredDelay.count()
923  << "ms), ignoring";
924  return false;
925  }
926 
927  // Delay required
928 
929  egressRateLimited_ = true;
930 
931  if (timer_) {
932  timer_->scheduleTimeout(&rateLimitCallback_, requiredDelay);
933  }
934 
936  return true;
937 }
938 
940  egressRateLimited_ = false;
942 }
943 
945  VLOG(4) << "egress EOM on " << *this;
946  // TODO: with ByteEvent refactor, we will have to delay changing this
947  // state until later
949  egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
950  size_t nbytes = transport_.sendEOM(this, trailers_.get());
951  trailers_.reset();
952  return nbytes;
953 }
954 
955 size_t HTTPTransaction::sendBodyNow(std::unique_ptr<folly::IOBuf> body,
956  size_t bodyLen, bool sendEom) {
957  static const std::string noneStr = "None";
958  DCHECK(body);
959  DCHECK_GT(bodyLen, 0);
960  size_t nbytes = 0;
961  if (useFlowControl_) {
962  CHECK(sendWindow_.reserve(bodyLen));
963  }
964  VLOG(4) << "Sending " << bodyLen
965  << " bytes of body. eom=" << ((sendEom) ? "yes" : "no")
966  << " send_window is "
967  << (useFlowControl_
968  ? folly::to<std::string>(
970  : noneStr)
971  << " trailers=" << ((trailers_) ? "yes" : "no") << " " << *this;
972  if (sendEom && !trailers_) {
974  egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
975  } else if (ingressErrorSeen_ && isExpectingWindowUpdate()) {
976  // I don't know how we got here but we're in trouble. We need a window
977  // update to continue but we've already seen an ingress error.
979  folly::to<std::string>("window blocked with ingress error,"
980  " streamID=", id_));
983  onError(ex);
984  return 0;
985  }
987  nbytes = transport_.sendBody(this,
988  std::move(body),
989  sendEom && !trailers_,
991  if (sendEom && trailers_) {
992  sendEOMNow();
993  }
994  if (isPrioritySampled()) {
996  }
997  if (egressLimitBytesPerMs_ > 0) {
998  numLimitedBytesEgressed_ += nbytes;
999  }
1000  return nbytes;
1001 }
1002 
1004  DestructorGuard g(this);
1006  egressState_, HTTPTransactionEgressSM::Event::sendEOM))
1007  << ", " << *this;
1010  auto errorMsg =
1011  folly::to<std::string>("Content-Length/body mismatch: expected= ",
1013  ", actual= ",
1015  LOG(ERROR) << errorMsg << " " << *this;
1016  }
1017 
1018  if (deferredEgressBody_.chainLength() == 0 && chunkHeaders_.empty()) {
1019  // there is nothing left to send, egress the EOM directly. For SPDY
1020  // this will jump the txn queue
1021  if (!isEnqueued()) {
1022  size_t nbytes = sendEOMNow();
1024  if (transportCallback_) {
1026  }
1027  } else {
1028  // If the txn is enqueued, sendDeferredBody()
1029  // should take care of sending the EOM.
1030  // This can happen for some uses of the egress queue
1031  VLOG(4) << "Queued egress EOM with no body"
1032  << "[egressState=" << egressState_ << ", "
1033  << "ingressState=" << ingressState_ << ", "
1034  << "egressPaused=" << egressPaused_ << ", "
1035  << "ingressPaused=" << ingressPaused_ << ", "
1036  << "aborted=" << aborted_ << ", "
1037  << "enqueued=" << isEnqueued() << ", "
1038  << "chainLength=" << deferredEgressBody_.chainLength() << "]"
1039  << " on " << *this;
1040  }
1041  } else {
1042  VLOG(4) << "Queued egress EOM on " << *this;
1044  }
1045 }
1046 
1050 }
1051 
1053  DestructorGuard g(this);
1056  if (aborted_) {
1057  // This can happen in cases where the abort is sent before notifying the
1058  // handler, but its logic also wants to abort
1059  VLOG(4) << "skipping redundant abort";
1060  return;
1061  }
1062  VLOG(4) << "aborting transaction " << *this;
1063  aborted_ = true;
1064  size_t nbytes = transport_.sendAbort(this, statusCode);
1065  if (transportCallback_) {
1067  size.uncompressed = nbytes;
1069  }
1070 }
1071 
1073  VLOG(4)<< "pauseIngress request " << *this;
1074  DestructorGuard g(this);
1075  if (ingressPaused_) {
1076  VLOG(4) << "can't pause ingress; ingressPaused=" << ingressPaused_;
1077  return;
1078  }
1079  ingressPaused_ = true;
1080  cancelTimeout();
1081  transport_.pauseIngress(this);
1082 }
1083 
1085  VLOG(4) << "resumeIngress request " << *this;
1086  DestructorGuard g(this);
1087  if (!ingressPaused_ || isIngressComplete()) {
1088  VLOG(4) << "can't resume ingress, ingressPaused=" << ingressPaused_
1089  << ", ingressComplete=" << isIngressComplete()
1090  << ", inResume_=" << inResume_ << " " << *this;
1091  return;
1092  }
1093  ingressPaused_ = false;
1094  transport_.resumeIngress(this);
1095  if (inResume_) {
1096  VLOG(4) << "skipping recursive resume loop " << *this;
1097  return;
1098  }
1099  inResume_ = true;
1100 
1101  if (deferredIngress_ && (maxDeferredIngress_ <= deferredIngress_->size())) {
1103  }
1104 
1105  // Process any deferred ingress callbacks
1106  // Note: we recheck the ingressPaused_ state because a callback
1107  // invoked by the resumeIngress() call above could have re-paused
1108  // the transaction.
1109  while (!ingressPaused_ && deferredIngress_ && !deferredIngress_->empty()) {
1110  HTTPEvent& callback(deferredIngress_->front());
1111  VLOG(5) << "Processing deferred ingress callback of type "
1112  << callback.getEvent() << " " << *this;
1113  switch (callback.getEvent()) {
1115  LOG(FATAL) << "unreachable";
1116  break;
1118  processIngressHeadersComplete(callback.getHeaders());
1119  break;
1120  case HTTPEvent::Type::BODY: {
1121  unique_ptr<IOBuf> data = callback.getBody();
1122  auto len = data->computeChainDataLength();
1123  CHECK(recvWindow_.free(len));
1124  processIngressBody(std::move(data), len);
1125  } break;
1127  processIngressChunkHeader(callback.getChunkLength());
1128  break;
1131  break;
1133  processIngressTrailers(callback.getTrailers());
1134  break;
1137  break;
1139  processIngressUpgrade(callback.getUpgradeProtocol());
1140  break;
1141  }
1142  if (deferredIngress_) {
1143  deferredIngress_->pop();
1144  }
1145  }
1147  inResume_ = false;
1148 }
1149 
1151  VLOG(4) << "asked to pause egress " << *this ;
1152  DestructorGuard g(this);
1153  if (egressPaused_) {
1154  VLOG(4) << "egress already paused " << *this ;
1155  return;
1156  }
1157  egressPaused_ = true;
1159 }
1160 
1162  VLOG(4) << "asked to resume egress" << *this;
1163  DestructorGuard g(this);
1164  if (!egressPaused_) {
1165  VLOG(4) << "egress already not paused" << *this;
1166  return;
1167  }
1168  egressPaused_ = false;
1170 }
1171 
1173  egressLimitBytesPerMs_ = bitsPerSecond / 8000;
1174  if (bitsPerSecond > 0 && egressLimitBytesPerMs_ == 0) {
1175  VLOG(4) << "ratelim: Limit too low (" << bitsPerSecond << "), ignoring";
1176  }
1179 }
1180 
1182  DestructorGuard guard(this);
1183  if (!egressRateLimited_ &&
1185  isEgressEOMQueued()) &&
1186  (!useFlowControl_ || sendWindow_.getSize() > 0)) {
1187  // Egress isn't paused, we have something to send, and flow
1188  // control isn't blocking us.
1189  if (!isEnqueued()) {
1190  // Insert into the queue and let the session know we've got something
1194  }
1195  } else if (isEnqueued()) {
1196  // Nothing to send, or not allowed to send right now.
1197  int64_t deferredEgressBodyBytes =
1198  folly::to<int64_t>(deferredEgressBody_.chainLength());
1199  transport_.notifyEgressBodyBuffered(-deferredEgressBodyBytes);
1201  }
1203 }
1204 
1206  int64_t availWindow =
1208  // do not count transaction stalled if no more bytes to send,
1209  // i.e. when availWindow == 0
1210  if (useFlowControl_ && availWindow < 0 && !flowControlPaused_) {
1211  VLOG(4) << "transaction stalled by flow control" << *this;
1212  if (stats_) {
1214  }
1215  }
1216  flowControlPaused_ = useFlowControl_ && availWindow <= 0;
1217  bool handlerShouldBePaused = egressPaused_ || flowControlPaused_ ||
1219 
1220  if (handler_ && handlerShouldBePaused != handlerEgressPaused_) {
1221  if (handlerShouldBePaused) {
1222  handlerEgressPaused_ = true;
1224  } else {
1225  handlerEgressPaused_ = false;
1227  }
1228  }
1229 }
1230 
1233  tableInfo.ingressHeaderTableSize_;
1235  tableInfo.ingressBytesStored_;
1237  tableInfo.ingressHeadersStored_;
1238 }
1239 
1242  tableInfo.egressHeaderTableSize_;
1244  tableInfo.egressBytesStored_;
1246  tableInfo.egressHeadersStored_;
1247 }
1248 
1250  return tableInfo_;
1251 }
1252 
1254  return ingressPaused_ || (deferredIngress_ && !deferredIngress_->empty());
1255 }
1256 
1258  if (!deferredIngress_) {
1259  deferredIngress_ = std::make_unique<std::queue<HTTPEvent>>();
1260  }
1261 }
1262 
1264  DestructorGuard g(this);
1265  CHECK_EQ(*pushTxn->assocStreamId_, id_);
1266  if (!handler_) {
1267  VLOG(4) << "Cannot add a pushed txn to an unhandled txn";
1268  return false;
1269  }
1270  handler_->onPushedTransaction(pushTxn);
1271  if (!pushTxn->getHandler()) {
1272  VLOG(4) << "Failed to create a handler for push transaction";
1273  return false;
1274  }
1275  pushedTransactions_.insert(pushTxn->getID());
1276  return true;
1277 }
1278 
1280  DestructorGuard g(this);
1281  CHECK_EQ(*(exTxn->getControlStream()), id_);
1282  if (!handler_) {
1283  LOG(ERROR) << "Cannot add a exTxn to an unhandled txn";
1284  return false;
1285  }
1286  handler_->onExTransaction(exTxn);
1287  if (!exTxn->getHandler()) {
1288  LOG(ERROR) << "Failed to create a handler for ExTransaction";
1289  return false;
1290  }
1291  exTransactions_.insert(exTxn->getID());
1292  return true;
1293 }
1294 
1296  std::chrono::milliseconds transactionTimeout) {
1297  transactionTimeout_ = transactionTimeout;
1298  VLOG(4) << "HTTPTransaction: transaction timeout is set to "
1299  << std::chrono::duration_cast<std::chrono::milliseconds>(
1300  transactionTimeout)
1301  .count();
1302  refreshTimeout();
1303 }
1304 
1305 void HTTPTransaction::describe(std::ostream& os) const {
1306  transport_.describe(os);
1307  os << ", streamID=" << id_;
1308 }
1309 
1310 /*
1311  * TODO: when HTTPSession sends a SETTINGS frame indicating a
1312  * different initial window, it should call this function on all its
1313  * transactions.
1314  */
1316  // Depending on whether delta is positive or negative it will cause the
1317  // window to either increase or decrease.
1318  if (!useFlowControl_) {
1319  return;
1320  }
1321  int32_t delta = capacity - recvWindow_.getCapacity();
1322  if (delta < 0) {
1323  // For now, we're disallowing shrinking the window, since it can lead
1324  // to FLOW_CONTROL_ERRORs if there is data in flight.
1325  VLOG(4) << "Refusing to shrink the recv window";
1326  return;
1327  }
1328  if (!recvWindow_.setCapacity(capacity)) {
1329  return;
1330  }
1331  recvToAck_ += delta;
1333 }
1334 
1336  if (recvToAck_ > 0 && useFlowControl_ && !isIngressEOMSeen() &&
1338  egressState_ != HTTPTransactionEgressSM::State::Start ||
1339  ingressState_ != HTTPTransactionIngressSM::State::Start)) {
1340  // Down egress upstream window updates until after headers
1341  VLOG(4) << "recv_window is " << recvWindow_.getSize()
1342  << " / " << recvWindow_.getCapacity() << " after acking "
1343  << recvToAck_ << " " << *this ;
1345  recvToAck_ = 0;
1346  }
1347 }
1348 
1350  return recvToAck_;
1351 }
1352 
1353 std::ostream&
1354 operator<<(std::ostream& os, const HTTPTransaction& txn) {
1355  txn.describe(os);
1356  return os;
1357 }
1358 
1360  newPriority = HTTPMessage::normalizePriority(newPriority);
1361  CHECK_GE(newPriority, 0);
1366 }
1367 
1369  const http2::PriorityUpdate& newPriority) {
1370  onPriorityUpdate(newPriority);
1372 }
1373 
1375  priority_ = priority;
1376 
1378  queueHandle_,
1379  priority_,
1380  &currentDepth_);
1381  if(priority_.streamDependency != 0 && currentDepth_ == 1) {
1382  priorityFallback_ = true;
1383  }
1384 }
1385 
1388  void accumulate(uint64_t weighted, uint64_t total) {
1389  weighted_ += weighted;
1390  total_ += total;
1391  }
1392 
1393  void accumulateWeighted(uint64_t weighted) {
1394  weighted_ += weighted;
1395  }
1396 
1398  total_ += total;
1399  }
1400 
1401  double getWeightedAverage() const {
1402  return total_ ? (double)weighted_ / (double)total_ : 0;
1403  }
1404  private:
1405  uint64_t weighted_{0};
1406  uint64_t total_{0};
1407  };
1408 
1409  struct WeightedValue {
1410  uint64_t value_{0};
1411 
1413  byTransactionBytesSent_.accumulate(value_ * bytes, bytes);
1414  }
1415 
1417  bySessionBytesScheduled_.accumulate(value_ * bytes, bytes);
1418  }
1419 
1421  wa) const {
1422  wa.byTransactionBytes_ =
1423  byTransactionBytesSent_.getWeightedAverage();
1424  wa.bySessionBytes_ =
1425  bySessionBytesScheduled_.getWeightedAverage();
1426  }
1427  private:
1430  };
1431 
1432 public:
1434  tnx_(tnx),
1435  transactionBytesScheduled_(false) {}
1436 
1437  void updateContentionsCount(uint64_t contentions, uint64_t depth) {
1438  transactionBytesScheduled_ = false;
1439  ratio_ = 0.0;
1440  contentions_.value_ = contentions;
1441  depth_.value_ = depth;
1442  }
1443 
1445  transactionBytesScheduled_ = true;
1446  measured_weight_.accumulateWeighted(bytes);
1447  if (contentions_.value_) {
1448  contentions_.accumulateByTransactionBytes(bytes);
1449  } else {
1450  VLOG(5) << "transfer " << bytes
1451  << " transaction body bytes while contentions count = 0 "
1452  << *tnx_;
1453  }
1454  depth_.accumulateByTransactionBytes(bytes);
1455  }
1456 
1458  measured_weight_.accumulateTotal(bytes);
1459  expected_weight_.accumulate((ratio_ * bytes) + 0.5, bytes);
1460  if (contentions_.value_) {
1461  contentions_.accumulateBySessionBytes(bytes);
1462  } else {
1463  VLOG(5) << "transfer " << bytes
1464  << " session body bytes while contentions count = 0 "
1465  << *tnx_;
1466  }
1467  depth_.accumulateBySessionBytes(bytes);
1468  }
1469 
1470  void updateRatio(double ratio) {
1471  ratio_ = ratio;
1472  }
1473 
1475  return transactionBytesScheduled_;
1476  }
1477 
1479  contentions_.getSummary(summary.contentions_);
1480  depth_.getSummary(summary.depth_);
1481  summary.expected_weight_ = expected_weight_.getWeightedAverage();
1482  summary.measured_weight_ = measured_weight_.getWeightedAverage();
1483  }
1484 private:
1485  // TODO: remove tnx_ when not needed
1486  HTTPTransaction* tnx_; // needed for error reporting, will be removed
1491  double ratio_;
1492  bool transactionBytesScheduled_:1;
1493 };
1494 
1496  if (sampled) {
1497  prioritySample_ = std::make_unique<PrioritySample>(this);
1498  } else {
1499  prioritySample_.reset();
1500  }
1501 }
1502 
1504  CHECK(prioritySample_);
1505  prioritySample_->updateContentionsCount(contentions,
1506  queueHandle_->calculateDepth(false));
1507 }
1508 
1510  CHECK(prioritySample_);
1511  prioritySample_->updateRatio(ratio);
1512 }
1513 
1515  CHECK(prioritySample_);
1516  // Do not accumulate session bytes utill header is sent.
1517  // Otherwise, the session bytes could be accumulated for a transaction
1518  // that is not allowed to egress yet.
1519  // Do not accumulate session bytes if transaction is paused.
1520  // On the other hand, if the transaction is part of the egress,
1521  // always accumulate the session bytes.
1522  if ((bytes && firstHeaderByteSent_ && !egressPaused_ &&
1524  || prioritySample_->isTransactionBytesScheduled()) {
1525  prioritySample_->updateSessionBytesSheduled(bytes);
1526  }
1527 }
1528 
1530  CHECK(prioritySample_);
1531  if (bytes) {
1532  prioritySample_->updateTransactionBytesSent(bytes);
1533  }
1534 }
1535 
1537  HTTPTransaction::PrioritySampleSummary& summary) const {
1538  if (prioritySample_) {
1539  prioritySample_->getSummary(summary);
1540  return true;
1541  }
1542  return false;
1543 }
1544 
1545 
1546 } // proxygen
size_t sendDeferredBody(uint32_t maxEgress)
virtual void lastByteAcked(std::chrono::milliseconds latency) noexcept=0
#define FOLLY_SCOPED_TRACE_SECTION(arg,...)
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
virtual void resumeIngress(HTTPTransaction *txn) noexcept=0
void onError(const HTTPException &error)
virtual void onPushedTransaction(HTTPTransaction *) noexcept
void processIngressChunkHeader(size_t length)
bool onWriteReady(uint32_t maxEgress, double ratio)
const TransportDirection direction_
HTTPCodec::StreamID id_
std::chrono::milliseconds millisecondsBetween(std::chrono::time_point< ClockType > finish, std::chrono::time_point< ClockType > start)
Definition: Time.h:85
void processIngressHeadersComplete(std::unique_ptr< HTTPMessage > msg)
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
bool responseBodyMustBeEmpty(unsigned status)
Definition: RFC2616.cpp:54
size_t chainLength() const
Definition: IOBufQueue.h:492
spdy::GoawayStatusCode statusCode
Definition: SPDYCodec.cpp:110
bool getPrioritySampleSummary(PrioritySampleSummary &summary) const
uint32_t getCapacity() const
Definition: Window.cpp:32
ProxygenError getProxygenError() const
Definition: Exception.h:50
virtual void onEgressPaused() noexcept=0
virtual void onChunkComplete() noexcept
std::ostream & operator<<(std::ostream &os, const HeaderTable &table)
virtual void onEgressResumed() noexcept=0
void updateIngressHPACKTableInfo(HPACKTableInfo)
virtual void setReceiveWindow(uint32_t capacity)
std::unique_ptr< HTTPHeaders > trailers_
bool hasCodecStatusCode() const
Definition: HTTPException.h:95
virtual void onGoaway(ErrorCode) noexcept
virtual void sendHeaders(HTTPTransaction *txn, const HTTPMessage &headers, HTTPHeaderSize *size, bool eom) noexcept=0
virtual void onError(const HTTPException &error) noexcept=0
uint16_t getStatusCode() const
virtual void bodyBytesReceived(size_t size) noexcept=0
virtual size_t sendChunkTerminator(HTTPTransaction *txn) noexcept=0
virtual Handle updatePriority(Handle handle, http2::PriorityUpdate pri, uint64_t *depth=nullptr)=0
uint32_t getOutstanding() const
Definition: Window.cpp:37
proxygen::TimePoint startRateLimit_
virtual bool isDraining() const =0
virtual void detachTransaction() noexcept=0
void reset(bool useFlowControl, uint32_t receiveInitialWindowSize, uint32_t receiveStreamWindowSize, uint32_t sendInitialWindowSize)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
virtual HPACKTableInfo getHPACKTableInfo() const
Definition: HTTPCodec.h:338
void getSummary(HTTPTransaction::PrioritySampleSummary &summary) const
bool onExTransaction(HTTPTransaction *txn)
void setIdleTimeout(std::chrono::milliseconds transactionTimeout)
void setProxygenError(ProxygenError proxygenError)
Definition: Exception.h:46
virtual CodecProtocol getProtocol() const =0
virtual void onUpgrade(UpgradeProtocol protocol) noexcept=0
virtual void lastByteFlushed() noexcept=0
void onIngressBody(std::unique_ptr< folly::IOBuf > chain, uint16_t padding)
virtual void onChunkHeader(size_t) noexcept
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void setCodecStatusCode(ErrorCode statusCode)
Definition: HTTPException.h:98
virtual uint64_t calculateDepth(bool includeVirtual=true) const =0
void updateSessionBytesSheduled(uint64_t bytes)
void onIngressChunkHeader(size_t length)
stop_watch< std::chrono::milliseconds > timer_
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
Definition: error.h:48
folly::Optional< HTTPCodec::ExAttributes > exAttributes_
virtual void recordTransactionStalled() noexcept=0
int32_t getSize() const
Definition: Window.cpp:23
bool validateIngressStateTransition(HTTPTransactionIngressSM::Event)
bool reserve(uint32_t amount, bool strict=true)
Definition: Window.cpp:41
void processIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
virtual void firstHeaderByteFlushed() noexcept=0
virtual void detach(HTTPTransaction *txn) noexcept=0
void scheduleTimeout(Callback *callback, std::chrono::milliseconds timeout)
void onIngressHeadersComplete(std::unique_ptr< HTTPMessage > msg)
folly::Optional< HTTPCodec::StreamID > assocStreamId_
Type getEvent() const
Definition: HTTPEvent.h:83
void onIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
size_t sendBodyNow(std::unique_ptr< folly::IOBuf > body, size_t bodyLen, bool eom)
void onGoaway(ErrorCode code)
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
virtual void onBody(std::unique_ptr< folly::IOBuf > chain) noexcept=0
void updateEgressHPACKTableInfo(HPACKTableInfo)
virtual void recordTransactionClosed() noexcept=0
static bool transit(State &state, Event event)
Definition: StateMachine.h:27
void updateTransactionBytesSent(uint64_t bytes)
void setEgressRateLimit(uint64_t bitsPerSecond)
virtual void notifyEgressBodyBuffered(int64_t bytes) noexcept=0
RateLimitCallback rateLimitCallback_
virtual void removeTransaction(Handle handle)=0
void updateAndSendPriority(int8_t newPriority)
void onIngressWindowUpdate(uint32_t amount)
virtual void recordTransactionOpened() noexcept=0
void onEgressLastByteAck(std::chrono::milliseconds latency)
virtual Handle addTransaction(HTTPCodec::StreamID id, http2::PriorityUpdate pri, HTTPTransaction *txn, bool permanent=false, uint64_t *depth=nullptr)=0
Direction getDirection() const
Definition: HTTPException.h:67
HTTPTransactionIngressSM::State ingressState_
bool onPushedTransaction(HTTPTransaction *txn)
folly::Optional< uint64_t > expectedContentLengthRemaining_
std::unique_ptr< PrioritySample > prioritySample_
bool isExpectingWindowUpdate() const
void onDelayedDestroy(bool delayed) override
virtual size_t sendAbort(HTTPTransaction *txn, ErrorCode statusCode) noexcept=0
http2::PriorityUpdate priority_
HTTPSessionStats * stats_
std::list< Chunk > chunkHeaders_
HTTPTransactionEgressSM::State egressState_
virtual size_t sendEOM(HTTPTransaction *txn, const HTTPHeaders *trailers) noexcept=0
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
ErrorCode getCodecStatusCode() const
virtual size_t sendWindowUpdate(HTTPTransaction *txn, uint32_t bytes) noexcept=0
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
Definition: Optional.h:300
std::unique_ptr< std::queue< HTTPEvent > > deferredIngress_
HPACKTableInfo & getHPACKTableInfo()
std::set< HTTPCodec::StreamID > exTransactions_
virtual void describe(std::ostream &) const =0
virtual void pauseIngress(HTTPTransaction *txn) noexcept=0
virtual StreamID mapPriorityToDependency(uint8_t) const
Definition: HTTPCodec.h:706
virtual void sendBody(std::unique_ptr< folly::IOBuf > body)
const Handler * getHandler() const
virtual void sendHeaders(const HTTPMessage &headers)
virtual void notifyIngressBodyProcessed(uint32_t bytes) noexcept=0
folly::HHWheelTimer * timer_
folly::Optional< uint64_t > expectedResponseLength_
virtual void onExTransaction(HTTPTransaction *) noexcept
HTTPHeaders & getHeaders()
Definition: HTTPMessage.h:273
void updateRelativeWeight(double ratio)
void describe(std::ostream &os) const
typename T::Event Event
Definition: StateMachine.h:21
const std::string & getSingleOrEmpty(const T &nameOrCode) const
Definition: HTTPHeaders.h:420
HTTP2PriorityQueueBase::Handle queueHandle_
void updateContentionsCount(uint64_t contentions, uint64_t depth)
virtual bool extraResponseExpected() const
folly::Optional< HTTPCodec::StreamID > getControlStream() const
virtual size_t sendChunkHeader(HTTPTransaction *txn, size_t length) noexcept=0
int * count
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
bool isResponse() const
Definition: HTTPMessage.h:668
TransportCallback * transportCallback_
virtual void firstByteFlushed() noexcept=0
std::set< HTTPCodec::StreamID > pushedTransactions_
virtual void sendHeadersWithEOM(const HTTPMessage &headers)
virtual void onHeadersComplete(std::unique_ptr< HTTPMessage > msg) noexcept=0
virtual size_t sendPriority(HTTPTransaction *txn, const http2::PriorityUpdate &pri) noexcept=0
virtual size_t sendBody(HTTPTransaction *txn, std::unique_ptr< folly::IOBuf >, bool eom, bool trackLastByteFlushed) noexcept=0
virtual void onEOM() noexcept=0
const char * string
Definition: Conv.cpp:212
HTTPTransaction(TransportDirection direction, HTTPCodec::StreamID id, uint32_t seqNo, Transport &transport, HTTP2PriorityQueueBase &egressQueue, folly::HHWheelTimer *timer=nullptr, const folly::Optional< std::chrono::milliseconds > &defaultTimeout=folly::Optional< std::chrono::milliseconds >(), HTTPSessionStats *stats=nullptr, bool useFlowControl=false, uint32_t receiveInitialWindowSize=0, uint32_t sendInitialWindowSize=0, http2::PriorityUpdate=http2::DefaultPriority, folly::Optional< HTTPCodec::StreamID > assocStreamId=HTTPCodec::NoStream, folly::Optional< HTTPCodec::ExAttributes > exAttributes=HTTPCodec::NoExAttributes)
bool setCapacity(uint32_t capacity)
Definition: Window.cpp:84
g_t g(f_t)
AsyncFizzClient::UniquePtr transport_
void setPrioritySampled(bool sampled)
void onIngressUpgrade(UpgradeProtocol protocol)
virtual void headerBytesGenerated(HTTPHeaderSize &size) noexcept=0
void onIngressSetSendWindow(uint32_t newWindowSize)
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
folly::IOBufQueue deferredEgressBody_
virtual void onTrailers(std::unique_ptr< HTTPHeaders > trailers) noexcept=0
uint64_t StreamID
Definition: HTTPCodec.h:49
folly::Optional< uint64_t > actualResponseLength_
FOLLY_CPP14_CONSTEXPR const Value & value() const &
Definition: Optional.h:268
virtual const HTTPCodec & getCodec() const noexcept=0
void processIngressUpgrade(UpgradeProtocol protocol)
void processIngressBody(std::unique_ptr< folly::IOBuf > chain, size_t len)
bool isRequest() const
Definition: HTTPMessage.h:661
bool free(uint32_t amount)
Definition: Window.cpp:63
folly::Optional< std::chrono::milliseconds > transactionTimeout_
folly::Optional< HTTPMethod > getMethod() const
static uint8_t normalizePriority(int8_t pri)
Definition: HTTPMessage.h:576
HTTPCodec::StreamID getID() const
void onPriorityUpdate(const http2::PriorityUpdate &priority)
virtual void notifyPendingEgress() noexcept=0
void updateContentionsCount(uint64_t contentions)
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
HTTP2PriorityQueueBase & egressQueue_
virtual void sendHeadersWithOptionalEOM(const HTTPMessage &headers, bool eom)
virtual void clearPendingEgress(Handle h)=0
virtual void headerBytesReceived(const HTTPHeaderSize &size) noexcept=0
virtual void bodyBytesGenerated(size_t nbytes) noexcept=0
void getSummary(HTTPTransaction::PrioritySampleSummary::WeightedAverage &wa) const
virtual void signalPendingEgress(Handle h)=0