16 #include <glog/logging.h> 22 using std::unique_ptr;
27 const int64_t kApproximateMTU = 1400;
28 const std::chrono::seconds kRateLimitMaxDelay(10);
48 deferredEgressBody_(
folly::IOBufQueue::cacheChainLength()),
49 direction_(direction),
54 recvWindow_(receiveInitialWindowSize),
55 sendWindow_(sendInitialWindowSize),
56 egressQueue_(egressQueue),
57 assocStreamId_(assocId),
59 ingressPaused_(false),
61 flowControlPaused_(false),
62 handlerEgressPaused_(false),
63 egressRateLimited_(false),
64 useFlowControl_(useFlowControl),
67 firstByteSent_(false),
68 firstHeaderByteSent_(false),
71 ingressErrorSeen_(false),
72 priorityFallback_(false),
74 enableLastByteFlushedTracking_(false),
75 transactionTimeout_(defaultTimeout),
80 egressState_ = HTTPTransactionEgressSM::State::SendingDone;
82 ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
90 egressState_ = HTTPTransactionEgressSM::State::SendingDone;
92 ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
116 VLOG(4) <<
"destroying transaction " << *
this;
159 std::unique_ptr<HTTPMessage> msg) {
166 HTTPTransactionIngressSM::Event::onHeaders)) {
169 if (msg->isRequest()) {
177 const auto& contentLen =
179 if (!contentLen.empty()) {
183 LOG(
ERROR) <<
"Invalid content-length: " << contentLen
184 <<
", ex=" << ex.what() << *
this;
198 VLOG(4) <<
"Queued ingress event of type " 206 std::unique_ptr<HTTPMessage> msg) {
230 HTTPTransactionIngressSM::Event::onBody)) {
238 auto errorMsg = folly::to<std::string>(
239 "Content-Length/body mismatch: received=",
241 " expecting no more than ",
243 LOG(
ERROR) << errorMsg <<
" " << *
this;
257 LOG(
ERROR) <<
"recvWindow_.reserve failed with len=" << len
258 <<
" padding=" << padding
273 <<
" size=" << len <<
" " << *
this;
312 HTTPTransactionIngressSM::Event::onChunkHeader)) {
319 <<
" size=" << length <<
" " << *
this;
338 HTTPTransactionIngressSM::Event::onChunkComplete)) {
344 VLOG(4) <<
"Queued ingress event of type " 364 HTTPTransactionIngressSM::Event::onTrailers)) {
371 VLOG(4) <<
"Queued ingress event of type " 391 HTTPTransactionIngressSM::Event::onUpgrade)) {
422 auto errorMsg = folly::to<std::string>(
423 "Content-Length/body mismatch: expecting another ",
425 LOG(
ERROR) << errorMsg <<
" " << *
this;
437 VLOG(4) <<
"Ignoring EOM on initial 100 response on " << *
this;
441 HTTPTransactionIngressSM::Event::onEOM)) {
451 VLOG(4) <<
"Queued ingress event of type " 463 VLOG(4) <<
"ingress EOM on " << *
this;
466 HTTPTransactionIngressSM::Event::eomFlushed)) {
497 VLOG(4) <<
"Marking ingress complete on " << *
this;
498 ingressState_ = HTTPTransactionIngressSM::State::ReceivingDone;
504 VLOG(4) <<
"Marking egress complete on " << *
this;
506 int64_t deferredEgressBodyBytes =
514 egressState_ = HTTPTransactionEgressSM::State::SendingDone;
522 std::stringstream ss;
523 ss <<
"Invalid ingress state transition, state=" <<
ingressState_ <<
524 ", event=" <<
event <<
", streamID=" <<
id_;
550 VLOG(4) <<
"Converting ingress error to ingress+egress due to" 551 " flow control, and aborting " << *
this;
570 if (wasEgressComplete && wasIngressComplete &&
588 if (wasEgressComplete) {
599 if (wasIngressComplete) {
612 VLOG(4) <<
"received GOAWAY notification on " << *
this;
624 VLOG(4) <<
"ingress timeout on " << *
this;
628 if (windowUpdateTimeout) {
630 folly::to<std::string>(
"ingress timeout, streamID=",
id_));
637 folly::to<std::string>(
"ingress timeout, streamID=",
id_));
652 VLOG(4) <<
"Remote side ack'd " << amount <<
" bytes " << *this ;
657 LOG(
ERROR) <<
"sendWindow_.free failed with amount=" << amount
673 LOG(
ERROR) <<
"sendWindow_.setCapacity failed with newWindowSize=" 683 VLOG(4) <<
"egress timeout on " << *
this;
686 folly::to<std::string>(
"egress timeout, streamID=",
id_));
733 egressState_, HTTPTransactionEgressSM::Event::sendHeaders));
743 const auto& contentLen =
745 if (!contentLen.empty()) {
749 LOG(
ERROR) <<
"Invalid content-length: " << contentLen <<
750 ", ex=" << ex.what() << *
this;
764 egressState_, HTTPTransactionEgressSM::Event::sendEOM));
771 egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
787 egressState_, HTTPTransactionEgressSM::Event::sendBody));
817 maxEgress, windowAvailable > 0 ? windowAvailable : 0) : maxEgress;
827 size_t canSend = std::min<size_t>(sendWindow, bytesLeft);
836 bool willSendEOM =
false;
842 DCHECK(curLen > 0 || willSendEOM);
859 curLen = std::min<size_t>(chunk.
length, canSend);
861 VLOG(4) <<
"sending " << curLen <<
" fin=false";
869 DCHECK_EQ(canSend, 0);
908 std::chrono::milliseconds requiredDelay(
915 if (requiredDelay.count() <= 0) {
920 if (requiredDelay > kRateLimitMaxDelay) {
922 VLOG(4) <<
"ratelim: Required delay too long (" << requiredDelay.count()
945 VLOG(4) <<
"egress EOM on " << *
this;
949 egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
956 size_t bodyLen,
bool sendEom) {
959 DCHECK_GT(bodyLen, 0);
964 VLOG(4) <<
"Sending " << bodyLen
965 <<
" bytes of body. eom=" << ((sendEom) ?
"yes" :
"no")
966 <<
" send_window is " 968 ? folly::to<std::string>(
971 <<
" trailers=" << ((
trailers_) ?
"yes" :
"no") <<
" " << *
this;
974 egressState_, HTTPTransactionEgressSM::Event::eomFlushed));
979 folly::to<std::string>(
"window blocked with ingress error," 1006 egressState_, HTTPTransactionEgressSM::Event::sendEOM))
1011 folly::to<std::string>(
"Content-Length/body mismatch: expected= ",
1015 LOG(
ERROR) << errorMsg <<
" " << *
this;
1031 VLOG(4) <<
"Queued egress EOM with no body" 1042 VLOG(4) <<
"Queued egress EOM on " << *
this;
1059 VLOG(4) <<
"skipping redundant abort";
1062 VLOG(4) <<
"aborting transaction " << *
this;
1073 VLOG(4)<<
"pauseIngress request " << *
this;
1076 VLOG(4) <<
"can't pause ingress; ingressPaused=" <<
ingressPaused_;
1085 VLOG(4) <<
"resumeIngress request " << *
this;
1088 VLOG(4) <<
"can't resume ingress, ingressPaused=" <<
ingressPaused_ 1090 <<
", inResume_=" <<
inResume_ <<
" " << *
this;
1096 VLOG(4) <<
"skipping recursive resume loop " << *
this;
1111 VLOG(5) <<
"Processing deferred ingress callback of type " 1112 << callback.
getEvent() <<
" " << *
this;
1113 switch (callback.getEvent()) {
1115 LOG(FATAL) <<
"unreachable";
1121 unique_ptr<IOBuf>
data = callback.getBody();
1151 VLOG(4) <<
"asked to pause egress " << *this ;
1154 VLOG(4) <<
"egress already paused " << *this ;
1162 VLOG(4) <<
"asked to resume egress" << *
this;
1165 VLOG(4) <<
"egress already not paused" << *
this;
1175 VLOG(4) <<
"ratelim: Limit too low (" << bitsPerSecond <<
"), ignoring";
1197 int64_t deferredEgressBodyBytes =
1211 VLOG(4) <<
"transaction stalled by flow control" << *
this;
1221 if (handlerShouldBePaused) {
1267 VLOG(4) <<
"Cannot add a pushed txn to an unhandled txn";
1272 VLOG(4) <<
"Failed to create a handler for push transaction";
1283 LOG(
ERROR) <<
"Cannot add a exTxn to an unhandled txn";
1288 LOG(
ERROR) <<
"Failed to create a handler for ExTransaction";
1296 std::chrono::milliseconds transactionTimeout) {
1298 VLOG(4) <<
"HTTPTransaction: transaction timeout is set to " 1299 << std::chrono::duration_cast<std::chrono::milliseconds>(
1307 os <<
", streamID=" <<
id_;
1325 VLOG(4) <<
"Refusing to shrink the recv window";
1338 egressState_ != HTTPTransactionEgressSM::State::Start ||
1361 CHECK_GE(newPriority, 0);
1389 weighted_ += weighted;
1394 weighted_ += weighted;
1402 return total_ ? (double)weighted_ / (
double)total_ : 0;
1413 byTransactionBytesSent_.accumulate(value_ * bytes, bytes);
1417 bySessionBytesScheduled_.accumulate(value_ * bytes, bytes);
1423 byTransactionBytesSent_.getWeightedAverage();
1425 bySessionBytesScheduled_.getWeightedAverage();
1435 transactionBytesScheduled_(false) {}
1438 transactionBytesScheduled_ =
false;
1440 contentions_.value_ = contentions;
1441 depth_.value_ = depth;
1445 transactionBytesScheduled_ =
true;
1446 measured_weight_.accumulateWeighted(bytes);
1447 if (contentions_.value_) {
1448 contentions_.accumulateByTransactionBytes(bytes);
1450 VLOG(5) <<
"transfer " << bytes
1451 <<
" transaction body bytes while contentions count = 0 " 1454 depth_.accumulateByTransactionBytes(bytes);
1458 measured_weight_.accumulateTotal(bytes);
1459 expected_weight_.accumulate((ratio_ * bytes) + 0.5, bytes);
1460 if (contentions_.value_) {
1461 contentions_.accumulateBySessionBytes(bytes);
1463 VLOG(5) <<
"transfer " << bytes
1464 <<
" session body bytes while contentions count = 0 " 1467 depth_.accumulateBySessionBytes(bytes);
1475 return transactionBytesScheduled_;
1480 depth_.getSummary(summary.
depth_);
1492 bool transactionBytesScheduled_:1;
WeightedAverage contentions_
size_t sendDeferredBody(uint32_t maxEgress)
uint64_t numLimitedBytesEgressed_
virtual void lastByteAcked(std::chrono::milliseconds latency) noexcept=0
bool isPrioritySampled() const
#define FOLLY_SCOPED_TRACE_SECTION(arg,...)
std::unique_ptr< folly::IOBuf > split(size_t n)
virtual void resumeIngress()
virtual void resumeIngress(HTTPTransaction *txn) noexcept=0
void onError(const HTTPException &error)
virtual void onPushedTransaction(HTTPTransaction *) noexcept
uint64_t egressLimitBytesPerMs_
void processIngressChunkHeader(size_t length)
bool onWriteReady(uint32_t maxEgress, double ratio)
const TransportDirection direction_
uint32_t ingressHeaderTableSize_
std::chrono::milliseconds millisecondsBetween(std::chrono::time_point< ClockType > finish, std::chrono::time_point< ClockType > start)
bool isIngressEOMSeen() const
void processIngressHeadersComplete(std::unique_ptr< HTTPMessage > msg)
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
bool responseBodyMustBeEmpty(unsigned status)
uint16_t lastResponseStatus_
size_t chainLength() const
spdy::GoawayStatusCode statusCode
void rateLimitTimeoutExpired()
bool getPrioritySampleSummary(PrioritySampleSummary &summary) const
uint32_t getCapacity() const
ProxygenError getProxygenError() const
uint32_t streamDependency
virtual void onEgressPaused() noexcept=0
void onEgressBodyFirstByte()
bool isIngressComplete() const
void checkCreateDeferredIngress()
virtual void onChunkComplete() noexcept
void accumulateTotal(uint64_t total)
std::ostream & operator<<(std::ostream &os, const HeaderTable &table)
virtual void onEgressResumed() noexcept=0
void updateIngressHPACKTableInfo(HPACKTableInfo)
virtual void setReceiveWindow(uint32_t capacity)
void accumulateByTransactionBytes(uint64_t bytes)
std::unique_ptr< HTTPHeaders > trailers_
bool hasCodecStatusCode() const
virtual void onGoaway(ErrorCode) noexcept
virtual void sendHeaders(HTTPTransaction *txn, const HTTPMessage &headers, HTTPHeaderSize *size, bool eom) noexcept=0
void processIngressChunkComplete()
void onIngressChunkComplete()
virtual void onError(const HTTPException &error) noexcept=0
WeightedValue contentions_
uint16_t getStatusCode() const
virtual void bodyBytesReceived(size_t size) noexcept=0
virtual size_t sendChunkTerminator(HTTPTransaction *txn) noexcept=0
virtual Handle updatePriority(Handle handle, http2::PriorityUpdate pri, uint64_t *depth=nullptr)=0
uint32_t getOutstanding() const
proxygen::TimePoint startRateLimit_
virtual bool isDraining() const =0
virtual void detachTransaction() noexcept=0
void reset(bool useFlowControl, uint32_t receiveInitialWindowSize, uint32_t receiveStreamWindowSize, uint32_t sendInitialWindowSize)
constexpr detail::Map< Move > move
virtual HPACKTableInfo getHPACKTableInfo() const
void getSummary(HTTPTransaction::PrioritySampleSummary &summary) const
bool onExTransaction(HTTPTransaction *txn)
void setIdleTimeout(std::chrono::milliseconds transactionTimeout)
void accumulateWeighted(uint64_t weighted)
void setProxygenError(ProxygenError proxygenError)
virtual CodecProtocol getProtocol() const =0
virtual void onUpgrade(UpgradeProtocol protocol) noexcept=0
uint32_t ingressHeadersStored_
virtual void lastByteFlushed() noexcept=0
void onIngressBody(std::unique_ptr< folly::IOBuf > chain, uint16_t padding)
virtual void onChunkHeader(size_t) noexcept
void updateSessionBytesSheduled(uint64_t bytes)
uint32_t egressHeadersStored_
void accumulate(uint64_t weighted, uint64_t total)
std::unique_ptr< folly::IOBuf > move()
void notifyTransportPendingEgress()
bool hasPendingEOM() const
—— Concurrent Priority Queue Implementation ——
void setCodecStatusCode(ErrorCode statusCode)
virtual uint64_t calculateDepth(bool includeVirtual=true) const =0
bool isEgressEOMQueued() const
void updateSessionBytesSheduled(uint64_t bytes)
void onIngressChunkHeader(size_t length)
stop_watch< std::chrono::milliseconds > timer_
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
folly::Optional< HTTPCodec::ExAttributes > exAttributes_
virtual void recordTransactionStalled() noexcept=0
void updateTransactionBytesSent(uint64_t bytes)
uint32_t ingressBytesStored_
bool validateIngressStateTransition(HTTPTransactionIngressSM::Event)
uint32_t maxDeferredIngress_
bool reserve(uint32_t amount, bool strict=true)
void processIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
virtual void firstHeaderByteFlushed() noexcept=0
WeightedAccumulator measured_weight_
virtual void detach(HTTPTransaction *txn) noexcept=0
void scheduleTimeout(Callback *callback, std::chrono::milliseconds timeout)
void onIngressHeadersComplete(std::unique_ptr< HTTPMessage > msg)
folly::Optional< HTTPCodec::StreamID > assocStreamId_
bool mustQueueIngress() const
void onIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
PrioritySample(HTTPTransaction *tnx)
size_t sendBodyNow(std::unique_ptr< folly::IOBuf > body, size_t bodyLen, bool eom)
void onGoaway(ErrorCode code)
constexpr auto size(C const &c) -> decltype(c.size())
virtual void onBody(std::unique_ptr< folly::IOBuf > chain) noexcept=0
bool isExpectingIngress() const
void updateEgressHPACKTableInfo(HPACKTableInfo)
virtual void recordTransactionClosed() noexcept=0
static bool transit(State &state, Event event)
void updateTransactionBytesSent(uint64_t bytes)
int32_t getRecvToAck() const
void setEgressRateLimit(uint64_t bitsPerSecond)
virtual void notifyEgressBodyBuffered(int64_t bytes) noexcept=0
RateLimitCallback rateLimitCallback_
virtual void removeTransaction(Handle handle)=0
void updateAndSendPriority(int8_t newPriority)
void onIngressWindowUpdate(uint32_t amount)
virtual void recordTransactionOpened() noexcept=0
void onEgressLastByteAck(std::chrono::milliseconds latency)
virtual Handle addTransaction(HTTPCodec::StreamID id, http2::PriorityUpdate pri, HTTPTransaction *txn, bool permanent=false, uint64_t *depth=nullptr)=0
Direction getDirection() const
HTTPTransactionIngressSM::State ingressState_
bool onPushedTransaction(HTTPTransaction *txn)
folly::Optional< uint64_t > expectedContentLengthRemaining_
std::unique_ptr< PrioritySample > prioritySample_
bool isExpectingWindowUpdate() const
void updateHandlerPauseState()
void onDelayedDestroy(bool delayed) override
virtual size_t sendAbort(HTTPTransaction *txn, ErrorCode statusCode) noexcept=0
http2::PriorityUpdate priority_
HTTPSessionStats * stats_
std::list< Chunk > chunkHeaders_
void updateRatio(double ratio)
void onEgressBodyLastByte()
HTTPTransactionEgressSM::State egressState_
virtual size_t sendEOM(HTTPTransaction *txn, const HTTPHeaders *trailers) noexcept=0
GuardImpl guard(ErrorHandler &&handler)
ErrorCode getCodecStatusCode() const
virtual size_t sendWindowUpdate(HTTPTransaction *txn, uint32_t bytes) noexcept=0
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
WeightedAccumulator byTransactionBytesSent_
std::unique_ptr< std::queue< HTTPEvent > > deferredIngress_
HPACKTableInfo tableInfo_
HPACKTableInfo & getHPACKTableInfo()
std::set< HTTPCodec::StreamID > exTransactions_
virtual void describe(std::ostream &) const =0
virtual void pauseIngress(HTTPTransaction *txn) noexcept=0
virtual StreamID mapPriorityToDependency(uint8_t) const
virtual void sendBody(std::unique_ptr< folly::IOBuf > body)
uint32_t egressHeaderTableSize_
const Handler * getHandler() const
virtual void sendHeaders(const HTTPMessage &headers)
virtual void notifyIngressBodyProcessed(uint32_t bytes) noexcept=0
folly::HHWheelTimer * timer_
folly::Optional< uint64_t > expectedResponseLength_
virtual void onExTransaction(HTTPTransaction *) noexcept
HTTPHeaders & getHeaders()
void updateRelativeWeight(double ratio)
void describe(std::ostream &os) const
~HTTPTransaction() override
virtual void pauseIngress()
HTTP2PriorityQueueBase::Handle queueHandle_
bool isTransactionBytesScheduled() const
void updateContentionsCount(uint64_t contentions, uint64_t depth)
virtual bool extraResponseExpected() const
folly::Optional< HTTPCodec::StreamID > getControlStream() const
virtual size_t sendChunkHeader(HTTPTransaction *txn, size_t length) noexcept=0
std::size_t computeChainDataLength() const
void markEgressComplete()
TransportCallback * transportCallback_
virtual void firstByteFlushed() noexcept=0
std::set< HTTPCodec::StreamID > pushedTransactions_
virtual void sendHeadersWithEOM(const HTTPMessage &headers)
virtual void onHeadersComplete(std::unique_ptr< HTTPMessage > msg) noexcept=0
virtual size_t sendPriority(HTTPTransaction *txn, const http2::PriorityUpdate &pri) noexcept=0
virtual size_t sendBody(HTTPTransaction *txn, std::unique_ptr< folly::IOBuf >, bool eom, bool trackLastByteFlushed) noexcept=0
virtual void onEOM() noexcept=0
double getWeightedAverage() const
HTTPTransaction(TransportDirection direction, HTTPCodec::StreamID id, uint32_t seqNo, Transport &transport, HTTP2PriorityQueueBase &egressQueue, folly::HHWheelTimer *timer=nullptr, const folly::Optional< std::chrono::milliseconds > &defaultTimeout=folly::Optional< std::chrono::milliseconds >(), HTTPSessionStats *stats=nullptr, bool useFlowControl=false, uint32_t receiveInitialWindowSize=0, uint32_t sendInitialWindowSize=0, http2::PriorityUpdate=http2::DefaultPriority, folly::Optional< HTTPCodec::StreamID > assocStreamId=HTTPCodec::NoStream, folly::Optional< HTTPCodec::ExAttributes > exAttributes=HTTPCodec::NoExAttributes)
void accumulateBySessionBytes(uint64_t bytes)
WeightedAccumulator bySessionBytesScheduled_
bool setCapacity(uint32_t capacity)
AsyncFizzClient::UniquePtr transport_
void setPrioritySampled(bool sampled)
void onEgressTrackedByte()
bool enableLastByteFlushedTracking_
uint32_t egressBytesStored_
void onIngressUpgrade(UpgradeProtocol protocol)
virtual void headerBytesGenerated(HTTPHeaderSize &size) noexcept=0
void onIngressSetSendWindow(uint32_t newWindowSize)
std::chrono::time_point< ClockType > getCurrentTime()
WeightedAccumulator expected_weight_
bool firstHeaderByteSent_
bool maybeDelayForRateLimit()
folly::IOBufQueue deferredEgressBody_
virtual void onTrailers(std::unique_ptr< HTTPHeaders > trailers) noexcept=0
double byTransactionBytes_
folly::Optional< uint64_t > actualResponseLength_
FOLLY_CPP14_CONSTEXPR const Value & value() const &
virtual const HTTPCodec & getCodec() const noexcept=0
bool handlerEgressPaused_
void processIngressUpgrade(UpgradeProtocol protocol)
void processIngressBody(std::unique_ptr< folly::IOBuf > chain, size_t len)
bool free(uint32_t amount)
folly::Optional< std::chrono::milliseconds > transactionTimeout_
folly::Optional< HTTPMethod > getMethod() const
bool isRemoteInitiated() const
void markIngressComplete()
static uint8_t normalizePriority(int8_t pri)
HTTPCodec::StreamID getID() const
void onEgressHeaderFirstByte()
void onPriorityUpdate(const http2::PriorityUpdate &priority)
virtual void notifyPendingEgress() noexcept=0
void updateContentionsCount(uint64_t contentions)
static constexpr uint64_t data[1]
HTTP2PriorityQueueBase & egressQueue_
virtual void sendHeadersWithOptionalEOM(const HTTPMessage &headers, bool eom)
virtual void clearPendingEgress(Handle h)=0
virtual void trackedByteFlushed() noexcept
virtual void headerBytesReceived(const HTTPHeaderSize &size) noexcept=0
virtual void bodyBytesGenerated(size_t nbytes) noexcept=0
uint8_t pendingByteEvents_
void getSummary(HTTPTransaction::PrioritySampleSummary::WeightedAverage &wa) const
bool isEgressComplete() const
virtual void signalPendingEgress(Handle h)=0