12 #include <boost/heap/d_ary_heap.hpp> 97 std::chrono::milliseconds ttfb,
98 std::chrono::milliseconds ttlb,
149 virtual void detachTransaction() noexcept = 0;
157 virtual void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept = 0;
164 virtual void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept = 0;
189 virtual void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept
202 virtual void onEOM() noexcept = 0;
226 virtual void onEgressPaused() noexcept = 0;
232 virtual void onEgressResumed() noexcept = 0;
266 LOG(FATAL) <<
"push txn received headers";
270 LOG(FATAL) <<
"push txn received body";
274 LOG(FATAL) <<
"push txn received chunk header";
278 LOG(FATAL) <<
"push txn received chunk complete";
282 LOG(FATAL) <<
"push txn received trailers";
286 LOG(FATAL) <<
"push txn received EOM";
290 LOG(FATAL) <<
"push txn received upgrade";
294 LOG(FATAL) <<
"push txn received push txn";
303 virtual void firstHeaderByteFlushed()
noexcept = 0;
305 virtual void firstByteFlushed() noexcept = 0;
307 virtual void lastByteFlushed() noexcept = 0;
311 virtual void lastByteAcked(std::chrono::milliseconds latency) noexcept = 0;
315 virtual void headerBytesReceived(
const HTTPHeaderSize& size) noexcept = 0;
317 virtual void bodyBytesGenerated(
size_t nbytes) noexcept = 0;
319 virtual void bodyBytesReceived(
size_t size) noexcept = 0;
344 bool eom) noexcept = 0;
347 std::unique_ptr<folly::IOBuf>,
349 bool trackLastByteFlushed) noexcept = 0;
352 size_t length) noexcept = 0;
368 virtual void notifyPendingEgress() noexcept = 0;
372 virtual void notifyIngressBodyProcessed(
uint32_t bytes) noexcept = 0;
374 virtual void notifyEgressBodyBuffered(
int64_t bytes) noexcept = 0;
382 virtual void describe(std::ostream&)
const = 0;
395 virtual void drain() = 0;
397 virtual bool isDraining()
const = 0;
406 bool unidirectional) noexcept = 0;
408 virtual std::string getSecurityProtocol()
const = 0;
410 virtual void addWaitingForReplaySafety(
413 virtual void removeWaitingForReplaySafety(
416 virtual bool needToBlockForReplaySafety()
const = 0;
424 virtual bool isReplaySafe()
const = 0;
426 virtual void setHTTP2PrioritiesEnabled(
bool enabled) = 0;
427 virtual bool getHTTP2PrioritiesEnabled()
const = 0;
430 getHTTPPriority(
uint8_t level) = 0;
453 bool useFlowControl =
false,
454 uint32_t receiveInitialWindowSize = 0,
464 void reset(
bool useFlowControl,
480 handler_->setTransaction(
this);
496 egressCalls_ > 0 ? cumulativeRatio_ / egressCalls_ : 0);
500 return priorityFallback_;
508 return ingressState_;
555 return (lastResponseStatus_ >= 100 && lastResponseStatus_ < 200)
556 && lastResponseStatus_ != 101;
567 virtual void setReceiveWindow(
uint32_t capacity);
577 return maxDeferredIngress_;
583 void onIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg);
589 void onIngressBody(std::unique_ptr<folly::IOBuf> chain,
uint16_t padding);
594 void onIngressChunkHeader(
size_t length);
599 void onIngressChunkComplete();
604 void onIngressTrailers(std::unique_ptr<HTTPHeaders> trailers);
643 void onIngressTimeout();
650 void onIngressWindowUpdate(
uint32_t amount);
657 void onIngressSetSendWindow(
uint32_t newWindowSize);
663 bool onWriteReady(
uint32_t maxEgress,
double ratio);
668 void onEgressTimeout();
673 void onEgressHeaderFirstByte();
678 void onEgressBodyFirstByte();
683 void onEgressBodyLastByte();
688 void onEgressTrackedByte();
696 void onEgressLastByteAck(std::chrono::milliseconds latency);
703 transportCallback_ = cb;
710 return ingressState_ != HTTPTransactionIngressSM::State::Start;
718 return ingressState_ == HTTPTransactionIngressSM::State::EOMQueued;
725 return ingressState_ == HTTPTransactionIngressSM::State::ReceivingDone;
732 return isIngressEOMQueued() || isIngressComplete();
739 return egressState_ != HTTPTransactionEgressSM::State::Start;
747 return egressState_ == HTTPTransactionEgressSM::State::EOMQueued;
754 return egressState_ == HTTPTransactionEgressSM::State::SendingDone;
769 return isEgressEOMQueued() || isEgressComplete();
784 HTTPTransactionEgressSM::Event::sendHeaders)
785 && (isUpstream() || lastResponseStatus_ == 0 || extraResponseExpected());
800 virtual void sendHeaders(
const HTTPMessage& headers);
801 virtual void sendHeadersWithEOM(
const HTTPMessage& headers);
802 virtual void sendHeadersWithOptionalEOM(
const HTTPMessage& headers,
bool eom);
816 virtual void sendBody(std::unique_ptr<folly::IOBuf> body);
827 egressState_, HTTPTransactionEgressSM::Event::sendChunkHeader));
829 if (!
transport_.getCodec().supportsParallelRequests()) {
830 chunkHeaders_.emplace_back(
Chunk(length));
842 egressState_, HTTPTransactionEgressSM::Event::sendChunkTerminator));
855 egressState_, HTTPTransactionEgressSM::Event::sendTrailers));
873 virtual void sendEOM();
884 virtual void sendAbort();
895 virtual void pauseIngress();
900 virtual void resumeIngress();
936 void setEgressRateLimit(
uint64_t bitsPerSecond);
955 transport_.getCodec().supportsPushTransactions();
967 if (isEgressEOMSeen()) {
970 auto txn =
transport_.newPushedTransaction(id_, handler);
972 pushedTransactions_.insert(txn->getID());
985 bool unidirectional =
false) {
986 auto txn =
transport_.newExTransaction(handler, id_, unidirectional);
988 exTransactions_.insert(txn->getID());
1012 return assocStreamId_.has_value();
1016 return exAttributes_.has_value();
1020 return isExTransaction() && exAttributes_->unidirectional;
1028 void setIdleTimeout(std::chrono::milliseconds transactionTimeout);
1034 return transactionTimeout_.hasValue();
1042 return transactionTimeout_.value();
1049 return assocStreamId_;
1065 return exAttributes_;
1072 return pushedTransactions_;
1079 return exTransactions_;
1087 pushedTransactions_.erase(pushStreamId);
1094 exTransactions_.erase(exStreamId);
1101 if (
timer_ && hasIdleTimeout()) {
1102 timer_->scheduleTimeout(
this, transactionTimeout_.value());
1111 bool ret = firstByteSent_;
1112 firstByteSent_ =
true;
1117 bool ret = inActiveSet_;
1118 inActiveSet_ =
false;
1127 bool ret = firstHeaderByteSent_;
1128 firstHeaderByteSent_ =
true;
1139 pendingByteEvents_++;
1144 CHECK_GT(pendingByteEvents_, 0);
1145 pendingByteEvents_--;
1159 void describe(std::ostream& os)
const;
1164 void updateAndSendPriority(
int8_t newPriority);
1178 transport_.addWaitingForReplaySafety(callback);
1186 transport_.removeWaitingForReplaySafety(callback);
1190 return transport_.needToBlockForReplaySafety();
1196 return prioritySample_ !=
nullptr;
1199 void setPrioritySampled(
bool sampled);
1200 void updateContentionsCount(
uint64_t contentions);
1201 void updateRelativeWeight(
double ratio);
1202 void updateSessionBytesSheduled(
uint64_t bytes);
1203 void updateTransactionBytesSent(
uint64_t bytes);
1207 double byTransactionBytes_{0};
1208 double bySessionBytes_{0};
1221 return deferredEgressBody_.chainLength() > 0;
1225 enableLastByteFlushedTracking_ = enabled;
1232 void onDelayedDestroy(
bool delayed)
override;
1238 void updateHandlerPauseState();
1247 bool mustQueueIngress()
const;
1253 void checkCreateDeferredIngress();
1262 void processIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg);
1263 void processIngressBody(std::unique_ptr<folly::IOBuf> chain,
size_t len);
1264 void processIngressChunkHeader(
size_t length);
1265 void processIngressChunkComplete();
1266 void processIngressTrailers(std::unique_ptr<HTTPHeaders> trailers);
1268 void processIngressEOM();
1270 void sendBodyFlowControlled(std::unique_ptr<folly::IOBuf> body =
nullptr);
1271 size_t sendBodyNow(std::unique_ptr<folly::IOBuf> body,
size_t bodyLen,
1273 size_t sendEOMNow();
1274 void onDeltaSendWindowSize(
int32_t windowDelta);
1276 void notifyTransportPendingEgress();
1278 size_t sendDeferredBody(
uint32_t maxEgress);
1280 bool maybeDelayForRateLimit();
1285 DCHECK(isEnqueued());
1286 egressQueue_.clearPendingEgress(queueHandle_);
1290 return deferredEgressBody_.chainLength() == 0 &&
1291 isEgressEOMQueued();
1294 bool isExpectingIngress()
const;
1296 bool isExpectingWindowUpdate()
const;
1298 void updateReadTimeout();
1304 void markIngressComplete();
1310 void markEgressComplete();
1323 void flushWindowUpdate();
1325 void rateLimitTimeoutExpired();
1333 txn_.rateLimitTimeoutExpired();
1392 explicit Chunk(
size_t inLength) : length(inLength), headerSent(false) {}
1451 double cumulativeRatio_{0};
1467 bool ingressPaused_:1;
1468 bool egressPaused_:1;
1469 bool flowControlPaused_:1;
1470 bool handlerEgressPaused_:1;
1471 bool egressRateLimited_:1;
1472 bool useFlowControl_:1;
1475 bool firstByteSent_:1;
1476 bool firstHeaderByteSent_:1;
1478 bool inActiveSet_:1;
1479 bool ingressErrorSeen_:1;
1480 bool priorityFallback_:1;
1481 bool headRequest_:1;
1482 bool enableLastByteFlushedTracking_:1;
WeightedAverage contentions_
bool isPrioritySampled() const
virtual void setHandler(Handler *handler)
virtual void onPushedTransaction(HTTPTransaction *) noexcept
const TransportDirection direction_
static State getNewInstance()
bool isIngressEOMSeen() const
folly::Optional< HTTPCodec::StreamID > getAssocTxnId() const
const Transport & getTransport() const
spdy::GoawayStatusCode statusCode
void timeoutExpired() noexceptoverride
void onChunkComplete() noexceptfinal
bool isIngressComplete() const
void onChunkHeader(size_t) noexceptfinal
uint64_t ingressBodyBytes
virtual void onChunkComplete() noexcept
std::ostream & operator<<(std::ostream &os, const HeaderTable &table)
virtual void sendTrailers(const HTTPHeaders &trailers)
std::unique_ptr< HTTPHeaders > trailers_
void setLastByteFlushedTrackingEnabled(bool enabled)
virtual void onGoaway(ErrorCode) noexcept
bool isEgressStarted() const
http2::PriorityUpdate getPriority() const
proxygen::TimePoint startRateLimit_
std::chrono::milliseconds timeToFirstByte
void onHeadersComplete(std::unique_ptr< HTTPMessage >) noexceptfinal
void onPushedTransaction(HTTPTransaction *) noexceptfinal
const wangle::TransportInfo & getSetupTransportInfo() const noexcept
bool isEgressPaused() const
virtual void onChunkHeader(size_t) noexcept
HTTPSessionStats * getSessionStats() const
bool hasPendingEOM() const
RateLimitCallback(HTTPTransaction &txn)
bool isEgressEOMQueued() const
requires E e noexcept(noexcept(s.error(std::move(e))))
bool isIngressStarted() const
stop_watch< std::chrono::milliseconds > timer_
std::set< HTTPCodec::StreamID > getExTransactions() const
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
folly::Optional< HTTPCodec::ExAttributes > exAttributes_
bool testAndClearActive()
bool isIngressEOMQueued() const
void getCurrentTransportInfo(wangle::TransportInfo *tinfo) const
const std::set< HTTPCodec::StreamID > & getPushedTransactions() const
bool testAndSetFirstHeaderByteSent()
folly::Optional< HTTPCodec::StreamID > assocStreamId_
void handler(int, siginfo_t *, void *)
Transport & getTransport()
bool isFlowControlPaused() const
uint64_t egressHeaderBytes
bool isEgressEOMSeen() const
void setTransportCallback(TransportCallback *cb)
constexpr auto size(C const &c) -> decltype(c.size())
void callbackCanceled() noexceptoverride
bool isExTransaction() const
void onTrailers(std::unique_ptr< HTTPHeaders >) noexceptfinal
bool hasIdleTimeout() const
static bool transit(State &state, Event event)
static Options cacheChainLength()
bool isIngressPaused() const
void getLocalAddress(folly::SocketAddress &addr) const
folly::Optional< uint64_t > expectedContentLengthRemaining_
std::unique_ptr< PrioritySample > prioritySample_
virtual ~HTTPTransactionHandler()
http2::PriorityUpdate priority_
std::list< Chunk > chunkHeaders_
HTTPTransactionEgressSM::State getEgressState() const
void incrementPendingByteEvents()
virtual HTTPTransaction * newExTransaction(HTTPTransactionHandler *handler, bool unidirectional=false)
static uint64_t egressBufferLimit_
std::unique_ptr< std::queue< HTTPEvent > > deferredIngress_
HPACKTableInfo tableInfo_
std::set< HTTPCodec::StreamID > exTransactions_
bool getPriorityFallback() const
uint64_t ingressHeaderBytes
void removePushedTransaction(HTTPCodec::StreamID pushStreamId)
std::chrono::milliseconds getIdleTimeout() const
const Handler * getHandler() const
folly::HHWheelTimer * timer_
folly::Optional< uint64_t > expectedResponseLength_
uint32_t getSequenceNumber() const
virtual void onExTransaction(HTTPTransaction *) noexcept
virtual bool needToBlockForReplaySafety() const
virtual bool canSendHeaders() const
void onEOM() noexceptfinal
HTTP2PriorityQueueBase::Handle queueHandle_
virtual HTTPTransaction * newPushedTransaction(HTTPPushTransactionHandler *handler)
virtual bool extraResponseExpected() const
folly::Optional< HTTPCodec::StreamID > getControlStream() const
static bool canTransit(const State state, Event event)
const PriorityUpdate DefaultPriority
uint32_t getMaxDeferredSize()
virtual void removeWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback *callback)
SteadyClock::time_point TimePoint
std::tuple< uint64_t, uint64_t, double > getPrioritySummary() const
std::set< HTTPCodec::StreamID > pushedTransactions_
bool isUnidirectional() const
AsyncFizzClient::UniquePtr transport_
virtual void sendChunkTerminator()
virtual const Window & getReceiveWindow() const
static const folly::Optional< StreamID > NoStream
bool hasPendingBody() const
folly::Optional< std::chrono::milliseconds > transactionTimeout_
~HTTPPushTransactionHandler() override
folly::Optional< HTTPCodec::ExAttributes > getExAttributes() const
bool isRemoteInitiated() const
HTTPCodec::StreamID getID() const
void timeoutExpired() noexceptoverride
const folly::SocketAddress & getPeerAddress() const noexcept
virtual void addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback *callback)
bool isDownstream() const
std::unique_ptr< Codec > getCodec(CodecType type, int level)
HTTP2PriorityQueueBase & egressQueue_
virtual ~HTTPTransactionTransportCallback()
ThreadPoolListHook * addr
void removeExTransaction(HTTPCodec::StreamID exStreamId)
bool testAndSetFirstByteSent()
void onUpgrade(UpgradeProtocol) noexceptfinal
bool supportsPushTransactions() const
virtual void trackedByteFlushed() noexcept
void onBody(std::unique_ptr< folly::IOBuf >) noexceptfinal
const folly::SocketAddress & getLocalAddress() const noexcept
void getPeerAddress(folly::SocketAddress &addr) const
std::chrono::milliseconds timeToLastByte
virtual void sendChunkHeader(size_t length)
TransactionInfo(std::chrono::milliseconds ttfb, std::chrono::milliseconds ttlb, uint64_t eHeader, uint64_t inHeader, uint64_t eBody, uint64_t inBody, bool completed)
void decrementPendingByteEvents()
HTTPTransactionIngressSM::State getIngressState() const
static const folly::Optional< ExAttributes > NoExAttributes
bool isEgressComplete() const