44 using std::unique_ptr;
49 static const uint32_t kWriteReadyMax = 65536;
53 static const uint32_t kMaxWritesPerLoop = 32;
56 "EXPORTER HTTP CERTIFICATE client";
58 "EXPORTER HTTP CERTIFICATE server";
116 AsyncTransportWrapper::UniquePtr sock,
120 unique_ptr<HTTPCodec>
codec,
124 localAddr, peerAddr, controller,
std::move(codec),
125 tinfo, infoCallback) {
130 AsyncTransportWrapper::UniquePtr sock,
134 unique_ptr<HTTPCodec> codec,
152 reads_(SocketState::PAUSED),
153 writes_(SocketState::UNPAUSED),
176 controllerPtr->getSessionFlowControlTimeout());
180 if (!
sock_->isReplaySafe()) {
181 sock_->setReplaySafetyCallback(
this);
188 std::unique_ptr<folly::IOBuf>
ekm;
191 label = kClientLabel;
193 label = kServerLabel;
197 ekm = fizzBase->getEkm(label,
nullptr, settingLen);
199 VLOG(4) <<
"Underlying transport does not support secondary " 201 return certAuthSettingVal;
206 certAuthSettingVal = (ekmVal & 0x3fffffff) | 0x80000000;
208 return certAuthSettingVal;
214 std::unique_ptr<folly::IOBuf>
ekm;
217 label = kServerLabel;
219 label = kClientLabel;
223 ekm = fizzBase->getEkm(label,
nullptr, settingLen);
225 VLOG(4) <<
"Underlying transport does not support secondary " 232 certAuthSettingVal = (ekmVal & 0x3fffffff) | 0x80000000;
236 if (certAuthSettingVal == value) {
261 if (certAuthSettingVal != 0) {
279 VLOG(4) << *
this <<
" closing";
284 DCHECK(!
sock_->getReadCallback());
318 std::shared_ptr<ByteEventTracker> byteEventTracker) {
337 size_t receiveStreamWindowSize,
338 size_t receiveSessionWindowSize) {
352 VLOG_IF(4,
started_) <<
"Must flush egress settings to peer";
355 for (
const auto& setting: inSettings) {
356 settings->
setSetting(setting.id, setting.value);
380 VLOG(3) <<
"session-level timeout on " << *
this;
387 "ignoring session timeout, transaction timeout imminent";
397 "ignoring session timeout, no transactions awaiting reads";
402 VLOG(4) << *
this <<
" Timeout with nothing pending";
415 VLOG(4) <<
"Write timeout for " << *
this;
426 VLOG(4) <<
"Flow control timeout for " << *
this;
459 VLOG(4) << *
this <<
" notified pending shutdown";
494 VLOG(4) <<
"dropping " << *
this;
496 VLOG(4) << *
this <<
" already shutdown";
528 pair<void*,uint32_t> readSpace =
530 *buf = readSpace.first;
531 *bufSize = readSpace.second;
537 "HTTPSession - readDataAvailable",
"readSize", readSize);
538 VLOG(10) <<
"read completed on " << *
this <<
", bytes=" << readSize;
558 size_t readSize =
readBuf->computeChainDataLength();
560 "HTTPSession - readBufferAvailable",
"readSize", readSize);
561 VLOG(5) <<
"read completed on " << *
this <<
", bytes=" << readSize;
584 const IOBuf* currentReadBuf;
592 currentReadBuf->
length() != 0)) {
596 if (bytesParsed == 0) {
608 VLOG(4) <<
"EOF on " << *
this;
628 VLOG(4) <<
"read error on " << *
this <<
": " << ex.what();
674 auto txnID = txn->
getID();
685 CHECK(
handler && controlStream > 0);
689 <<
" does not support ExTransaction";
693 LOG(
ERROR) <<
"cannot support any more transactions in " << *
this;
730 VLOG(4) << *
this <<
" starting streamID=" << txn->getID()
747 h2Pri.
weight = std::get<2>(*res);
759 VLOG(4) <<
"processing new msg streamID=" << streamID <<
" " << *
this;
770 "Downstream attempts to send ingress, abort.");
803 DCHECK(it->second.isIngressEOMSeen());
804 it->second.pauseIngress();
816 VLOG(4) <<
"processing new push promise streamID=" << streamID
817 <<
" on assocStreamID=" << assocStreamID <<
" " << *
this;
821 if (assocStreamID == 0) {
822 VLOG(2) <<
"push promise " << streamID <<
" should be associated with " 823 <<
"an active stream=" << assocStreamID <<
" " << *
this;
829 VLOG(2) <<
"push promise cannot be sent to upstream " << *
this;
836 VLOG(2) <<
"cannot find the assocTxn=" << assocTxn
837 <<
", or assoc stream is already closed by upstream" << *
this;
850 VLOG(1) <<
"Failed to add pushed txn " << streamID
851 <<
" to assoc txn " << assocStreamID <<
" on " << *
this;
853 folly::to<std::string>(
"Failed to add pushed transaction ", streamID));
864 VLOG(4) <<
"processing new ExMessage=" << streamID
865 <<
" on controlStream=" << controlStream <<
", " << *
this;
869 if (controlStream == 0) {
870 LOG(
ERROR) <<
"ExMessage=" << streamID <<
" should have an active control " 871 <<
"stream=" << controlStream <<
", " << *
this;
879 LOG(
ERROR) <<
"no control stream=" << controlStream <<
", " << *
this;
900 unique_ptr<HTTPMessage> msg) {
903 VLOG(4) <<
"processing ingress headers complete for " << *
this <<
919 const char* sslCipher =
925 if (controlStreamID) {
928 VLOG(2) <<
"txn=" << streamID <<
" with a broken controlTxn=" 929 << *controlStreamID <<
" " << *
this;
932 folly::to<std::string>(
"broken controlTxn ", *controlStreamID));
939 VLOG(2) <<
"Failed to add exTxn=" << streamID
940 <<
" to controlTxn=" << *controlStreamID <<
", " << *
this;
943 folly::to<std::string>(
"Fail to add exTxn ", streamID));
959 if (!txn->getHandler()) {
966 txn->onIngressHeadersComplete(
std::move(msg));
971 unique_ptr<IOBuf> chain,
uint16_t padding) {
988 VLOG(4) << *
this <<
" pausing due to read limit exceeded.";
1031 unique_ptr<HTTPHeaders> trailers) {
1046 VLOG(4) <<
"processing ingress message complete for " << *
this <<
1065 const bool txnIngressFinished =
1067 if (txnIngressFinished) {
1092 txnIngressFinished &&
1094 VLOG(4) << *
this <<
" cannot reuse ingress";
1105 VLOG(4) <<
"Error on " << *
this <<
", streamID=" << streamID
1129 if (error.hasHttpStatusCode() && streamID != 0) {
1140 }
else if (newTxn) {
1143 VLOG(4) << *
this <<
" parse error with invalid transaction";
1150 txn->
getEgressState() == HTTPTransactionEgressSM::State::Start) {
1157 VLOG(4) << *
this <<
"shutdown from onError";
1165 VLOG(4) <<
"stream abort on " << *
this <<
", streamID=" << streamID
1169 VLOG(4) << *
this <<
" abort for unrecognized transaction, streamID= " 1174 folly::to<std::string>(
"Stream aborted, streamID=",
1185 DCHECK(pushTxn !=
nullptr);
1186 pushTxn->onError(ex);
1190 for (
auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1201 std::unique_ptr<folly::IOBuf> debugData) {
1218 vector<HTTPCodec::StreamID> ids;
1243 string debugInfo = (debugData) ?
1244 folly::to<string>(
" with debug info: ", (
char*)debugData->
data()) :
"";
1247 " on transaction id: ", *firstStream,
1253 ids.push_back(*firstStream);
1260 VLOG(4) << *
this <<
" got ping request with id=" << uniqueID;
1279 VLOG(4) << *
this <<
" got ping reply with id=" << uniqueID;
1287 VLOG(4) << *
this <<
" got window update on streamID=" << streamID <<
" for " 1288 << amount <<
" bytes.";
1304 for (
auto& setting : settings) {
1324 VLOG(4) << *
this <<
" received settings ack";
1340 txn->onPriorityUpdate(h2Pri);
1348 std::unique_ptr<IOBuf> authRequest) {
1350 VLOG(4) <<
"CERTIFICATE_REQUEST on" << *
this <<
", requestId=" << requestId;
1352 std::pair<uint16_t, std::unique_ptr<folly::IOBuf>> authenticator;
1369 VLOG(4) <<
"Underlying transport does not support secondary " 1374 authenticator.first,
1381 std::unique_ptr<IOBuf> authenticator) {
1383 VLOG(4) <<
"CERTIFICATE on" << *
this <<
", certId=" << certId;
1385 bool isValid =
false;
1402 VLOG(4) <<
"Underlying transport does not support secondary " 1407 VLOG(4) <<
"Successfully validated the authenticator provided by the peer.";
1409 VLOG(4) <<
"Failed to validate the authenticator provided by the peer";
1416 CHECK_EQ(streamID, 1);
1429 sock_->getEventBase()->runInLoop([oldCodec =
std::move(oldCodec)] () {});
1476 VLOG(4) << *
this <<
" got send window size adjustment. new=" << windowSize;
1482 VLOG(4) << *
this <<
" got new maximum number of concurrent txns " 1483 <<
"we can initiate: " << maxTxns;
1502 VLOG(4) << *
this <<
" pausing streamID=" << txn->getID() <<
1506 auto exTxns = txn->getExTransactions();
1507 for (
auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1510 exTxn->pauseIngress();
1520 VLOG(4) << *
this <<
" resuming streamID=" << txn->getID() <<
1523 auto exTxns = txn->getExTransactions();
1524 for (
auto it = exTxns.begin(); it != exTxns.end(); ++it) {
1527 exTxn->resumeIngress();
1542 VLOG(3) <<
"Transaction timeout for streamID=" << txn->getID();
1549 if (!txn->getHandler() &&
1550 txn->getEgressState() == HTTPTransactionEgressSM::State::Start) {
1551 VLOG(4) << *
this <<
" Timed out receiving headers";
1565 VLOG(4) << *
this <<
" creating direct error handler";
1573 txn->onIngressTimeout();
1581 unique_ptr<IOBuf> goawayBuf;
1590 if (
isUpstream() || (txn->isPushed() && headers.isRequest())) {
1594 txn->onPriorityUpdate(pri);
1600 auto exAttributes = txn->getExAttributes();
1601 auto assocStream = txn->getAssocTxnId();
1609 }
else if (headers.isRequest() && assocStream) {
1627 if (
isDownstream() && headers.isResponse() && txn->isPushed()) {
1633 bool shouldAddFirstHeaderByteEvent =
isUpstream() ||
1635 if (shouldAddFirstHeaderByteEvent && newOffset > oldOffset &&
1641 VLOG(4) << *
this <<
" sending headers, size=" <<
size->compressed
1642 <<
", uncompressedSize=" <<
size->uncompressed;
1645 VLOG(4) << *
this <<
" moved GOAWAY to end of writeBuf";
1668 std::unique_ptr<folly::IOBuf> body,
1670 bool trackLastByteFlushed)
noexcept {
1672 size_t bodyLen = body ? body->computeChainDataLength(): 0;
1690 VLOG(5) << *
this <<
" sending EOM in body for streamID=" << txn->getID();
1733 VLOG(4) << *
this <<
" resetting egress after this message";
1764 VLOG(4) << *
this <<
" sending EOM for streamID=" << txn->getID()
1765 <<
" trailers=" << (trailers ?
"yes" :
"no");
1767 size_t encodedSize = 0;
1775 if (!http2Trailers) {
1789 VLOG(4) << *
this <<
" sending abort for streamID=" << txn->getID();
1817 std::unique_ptr<SecondaryAuthManager> secondAuthManager) {
1830 std::unique_ptr<folly::IOBuf> certificateRequestContext,
1831 std::vector<fizz::Extension> extensions) {
1837 if (ingressSettings && egressSettings) {
1842 VLOG(4) <<
"Secondary certificate authentication is not supported.";
1850 if (encodedSize > 0) {
1853 VLOG(4) <<
"Failed to generate CERTIFICATE_REQUEST frame.";
1880 size_t oldStreamCount,
uint32_t txnSeqn) {
1885 DCHECK_EQ(nextTxn.getSequenceNumber(), txnSeqn + 1);
1886 DCHECK(!nextTxn.isIngressComplete());
1887 DCHECK(nextTxn.isIngressPaused());
1888 VLOG(4) <<
"Resuming paused pipelined txn " << nextTxn;
1889 nextTxn.resumeIngress();
1899 auto txnSeqn = txn->getSequenceNumber();
1903 if (txn->isIngressPaused()) {
1906 VLOG(4) << *
this <<
" detached paused transaction=" <<
streamID;
1910 VLOG(4) << *
this <<
" removing streamID=" << streamID <<
1912 CHECK_GT(liveTransactions_, 0);
1913 liveTransactions_--;
1915 if (txn->isPushed()) {
1921 if (txn->getControlStream()) {
2005 sock_->getEventBase()->runInLoop(
this);
2030 #if defined(__linux__) || defined(__FreeBSD__) 2032 if (tinfo->tcpinfo.tcpi_rtt > 0) {
2047 VLOG(4) <<
"skipping write during this loop, numActiveWrites_=" <<
2058 VLOG(4) <<
"Session-level send window is full, skipping remaining " 2059 <<
"body writes this loop";
2069 if (txnMaxToSend == 0) {
2084 uint32_t txnAllowed = txnPair.second * toSend;
2085 if (nextEgressResults_.size() > 1) {
2091 if (txnAllowed == 0) {
2093 VLOG(4) << *
this <<
" breaking egress loop on 0 txnAllowed";
2097 VLOG(4) << *
this <<
" egressing txnID=" << txnPair.first->getID() <<
2098 " allowed=" << txnAllowed;
2099 txnPair.first->onWriteReady(txnAllowed, txnPair.second);
2101 nextEgressResults_.clear();
2112 VLOG(5) << *
this <<
" writeBuf_.chainLength(): " 2118 VLOG(5) << *
this <<
" splitting " << needed <<
" bytes out of a " 2152 VLOG(5) << *
this <<
" in loop callback";
2154 for (
uint32_t i = 0;
i < kMaxWritesPerLoop; ++
i) {
2171 <<
" bytes of egress to be written: " << len
2172 <<
" cork:" << cork <<
" eom:" << eom;
2194 VLOG(4) << *
this <<
" writing " << len <<
", activeWrites=" 2216 sock_->setReadCB(
this);
2230 VLOG(5) << *
this <<
" scheduling write callback";
2231 sock_->getEventBase()->runInLoop(
this);
2239 VLOG(3) <<
"Pausing egress for " << *
this;
2240 writes_ = SocketState::PAUSED;
2243 VLOG(3) <<
"Resuming egress for " << *
this;
2244 writes_ = SocketState::UNPAUSED;
2260 VLOG(3) <<
"Pausing txn egress for " << *
this <<
" deferred";
2263 VLOG(3) <<
"Pausing txn egress for " << *
this;
2270 VLOG(3) <<
"Cancel deferred txn egress pause for " << *
this;
2273 VLOG(3) <<
"Ignoring redundant resume for " << *
this;
2276 VLOG(3) <<
"Resuming txn egress for " << *
this;
2284 bool shutdownWrites,
2291 VLOG(4) <<
"shutdown request for " << *
this <<
": reads=" 2293 <<
"), writes=" << shutdownWrites <<
" (currently " 2296 bool notifyEgressShutdown =
false;
2297 bool notifyIngressShutdown =
false;
2302 }
else if (
sock_->error()) {
2303 VLOG(3) <<
"shutdown request for " << *
this 2304 <<
" on bad socket. Shutting down writes too.";
2310 shutdownWrites =
true;
2320 VLOG(4) << *
this <<
" Converting read shutdown to read/write due to" 2322 shutdownWrites =
true;
2338 VLOG(4) << *
this <<
" writes drained, sending RST";
2340 shutdownReads =
true;
2342 VLOG(4) << *
this <<
" writes drained, closing";
2343 sock_->shutdownWriteNow();
2345 notifyEgressShutdown =
true;
2348 notifyEgressShutdown =
true;
2353 notifyIngressShutdown =
true;
2355 sock_->setReadCB(
nullptr);
2372 if (notifyIngressShutdown || notifyEgressShutdown) {
2373 auto dir = (notifyIngressShutdown && notifyEgressShutdown)
2379 folly::to<std::string>(
"Shutdown transport: ",
getErrorString(error),
2380 errorMsg.empty() ?
"" :
" ", errorMsg,
", ",
2395 VLOG(4) <<
"shutdownTransportWithReset";
2398 sock_->setReadCB(
nullptr);
2409 VLOG(4) << *
this <<
" cancel write timer";
2436 VLOG(10) << *
this <<
" checking for shutdown, readShutdown=" 2445 VLOG(4) <<
"destroying " << *
this;
2446 sock_->setReadCB(
nullptr);
2453 sock_->closeWithReset();
2464 VLOG(4) << *
this <<
" draining";
2474 VLOG(4) << *
this <<
" shutdown from drain";
2478 VLOG(4) << *
this <<
" already draining";
2577 std::piecewise_construct,
2578 std::forward_as_tuple(streamID),
2579 std::forward_as_tuple(
2591 CHECK(matchPair.second) <<
"Emplacement failed, despite earlier " 2602 if (stats !=
nullptr) {
2607 VLOG(5) << *
this <<
" adding streamID=" << txn->
getID()
2637 VLOG(10) <<
"Cancel write timer on last successful write";
2640 VLOG(10) <<
"Refresh write timer on writeSuccess";
2663 VLOG(4) << *
this <<
" shutdown from onWriteSuccess";
2682 VLOG(4) <<
"Egress limit reached, shutting down " 2692 VLOG(4) << *
this <<
" write error: " << ex.what();
2727 VLOG(4) << *
this <<
" session layer parse error. Terminate the session.";
2729 std::unique_ptr<folly::IOBuf> errorMsg =
2744 VLOG(4) << *
this <<
" parse error with new transaction";
2770 VLOG(4) << *
this <<
": pausing reads";
2775 sock_->setReadCB(
nullptr);
2776 reads_ = SocketState::PAUSED;
2790 VLOG(4) << *
this <<
": resuming reads";
2792 reads_ = SocketState::UNPAUSED;
2795 sock_->getEventBase()->runInLoop(
this);
2801 VLOG(10) << __PRETTY_FUNCTION__
2815 std::vector<HTTPCodec::StreamID> ids;
2817 ids.push_back(txn.first);
2823 const std::vector<HTTPCodec::StreamID>& ids,
2827 if (!errorMsg.empty()) {
2828 extraErrorMsg = folly::to<std::string>(
". ", errorMsg);
2831 for (
auto id: ids) {
2834 " on transaction id: ",
id,
2845 if (txn !=
nullptr) {
2857 txn->resumeEgress();
2861 auto stopFn = [
this] {
2868 VLOG(3) <<
"Pausing txn egress for " << *
this;
2882 VLOG(4) << *
this <<
" session stalled by flow control";
2892 if (timeout != std::chrono::milliseconds(0)) {
2905 VLOG(4) << *
this <<
" getGracefulGoawayAck is reusable and not draining";
2912 LOG(
ERROR) <<
"Invalid stream on non-parallel codec.";
2917 folly::to<std::string>(
"invalid stream=", stream));
2950 sock_->setReplaySafetyCallback(
nullptr);
2957 callback->onReplaySafe();
2959 waitingForReplaySafety_.clear();
2964 if (!
sock_->isEorTrackingEnabled() || !eomTracked) {
2968 if (eomOffset !=
sock_->getAppBytesWritten()) {
2969 VLOG(2) <<
"tracking ack to last app byte " << eomOffset
2970 <<
" while " <<
sock_->getAppBytesWritten()
2971 <<
" app bytes have already been written";
2975 VLOG(5) <<
"tracking raw last byte " <<
sock_->getRawBytesWritten()
2976 <<
" while the app last byte is " << eomOffset;
size_t sendAbort(HTTPTransaction *txn, ErrorCode statusCode) noexceptoverride
size_t sendBody(HTTPTransaction *txn, std::unique_ptr< folly::IOBuf >, bool includeEOM, bool trackLastByteFlushed) noexceptoverride
size_t getPipelineStreamCount() const
bool getCurrentTransportInfo(wangle::TransportInfo *tinfo) override
#define FOLLY_SCOPED_TRACE_SECTION(arg,...)
std::unique_ptr< folly::IOBuf > split(size_t n)
bool readsUnpaused() const
HTTPTransaction * createTransaction(HTTPCodec::StreamID streamID, const folly::Optional< HTTPCodec::StreamID > &assocStreamID, const folly::Optional< HTTPCodec::ExAttributes > &exAttributes, const http2::PriorityUpdate &priority=http2::DefaultPriority)
virtual const HTTPSettings * getIngressSettings() const
virtual void setHandler(Handler *handler)
void onCertificate(uint16_t certId, std::unique_ptr< folly::IOBuf > authenticator) override
void shutdownTransport(bool shutdownReads=true, bool shutdownWrites=true, const std::string &errorMsg="")
void onError(const HTTPException &error)
size_t sendSettings() override
virtual size_t onIngress(const folly::IOBuf &buf)=0
InfoCallback * infoCallback_
bool isIngressEOMSeen() const
const folly::IOBuf * front() const
folly::Optional< HTTPCodec::StreamID > getAssocTxnId() const
bool egressLimitExceeded() const
virtual size_t generatePingReply(folly::IOBufQueue &, uint64_t)
static const folly::Optional< uint8_t > NoPadding
virtual void onActivateConnection(const HTTPSessionBase &)
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
bool onNativeProtocolUpgradeImpl(HTTPCodec::StreamID txn, std::unique_ptr< HTTPCodec > codec, const std::string &protocolString)
void invokeOnAllTransactions(void(HTTPTransaction::*fn)(Args1...), Args2 &&...args)
virtual bool closeOnEgressComplete() const =0
void errorOnTransactionId(HTTPCodec::StreamID id, HTTPException ex)
virtual StreamID createStream()=0
folly::AsyncTransportWrapper * getTransport() override
virtual HTTPSettings * getEgressSettings()
void setSessionStats(HTTPSessionStats *stats) override
size_t chainLength() const
spdy::GoawayStatusCode statusCode
virtual bool supportsSessionFlowControl() const
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
void onPingReply(uint64_t uniqueID) override
void onPingReplyLatency(int64_t latency) noexceptoverride
void onEgressBuffered() override
bool resetSocketOnShutdown_
HTTPCodecFilterChain codec_
HTTPTransaction * findTransaction(HTTPCodec::StreamID streamID)
void onDeleteAckEvent() noexceptoverride
size_t receiveSessionWindowSize_
uint32_t streamDependency
bool isBusy() const override
void resumeIngress(HTTPTransaction *txn) noexceptoverride
virtual bool supportsPushTransactions() const =0
void onEgressMessageFinished(HTTPTransaction *txn, bool withRST=false)
void setReadBufferLimit(uint32_t limit)
virtual void onPingReplyReceived()
virtual void onWrite(const HTTPSessionBase &, size_t)
virtual void generateHeader(folly::IOBufQueue &writeBuf, StreamID stream, const HTTPMessage &msg, bool eom=false, HTTPHeaderSize *size=nullptr)=0
void writeSuccess() noexceptoverride
const SocketAddress peerAddr
FlowControlFilter * connFlowControl_
virtual void setReceiveWindow(uint32_t capacity)
virtual void onSettingsOutgoingStreamsNotFull(const HTTPSessionBase &)
void onNewTransactionParseError(HTTPCodec::StreamID streamID, const HTTPException &error)
void errorOnAllTransactions(ProxygenError err, const std::string &errorMsg)
void setReceiveWindowSize(folly::IOBufQueue &writeBuf, uint32_t capacity)
std::unique_ptr< folly::IOBuf > getNextToSend(bool *cork, bool *eom)
size_t getCodecSendWindowSize() const
bool hasCodecStatusCode() const
void sendHeaders(HTTPTransaction *txn, const HTTPMessage &headers, HTTPHeaderSize *size, bool includeEOM) noexceptoverride
HTTP2PriorityQueue::NextEgressResult nextEgressResults_
void readErr(const folly::AsyncSocketException &) noexceptoverride
size_t sendCertificateRequest(std::unique_ptr< folly::IOBuf > certificateRequestContext, std::vector< fizz::Extension > extensions) override
WriteTimeout writeTimeout_
virtual void onEgressBuffered(const HTTPSessionBase &)
void writeTimeoutExpired() noexcept
void onCreateTransaction()
void onIngressChunkComplete()
void onWriteError(size_t bytesWritten, const folly::AsyncSocketException &ex)
size_t receiveStreamWindowSize_
void onPingRequest(uint64_t uniqueID) override
void onCertificateRequest(uint16_t requestId, std::unique_ptr< folly::IOBuf > authRequest) override
void dropConnection() override
static uint32_t maxReadBufferSize_
bool isBufferMovable() noexceptoverride
virtual size_t generateChunkTerminator(folly::IOBufQueue &writeBuf, StreamID stream)=0
std::enable_if< std::is_constructible< C >::value >::type addFilters()
virtual void recordSessionStalled() noexcept=0
void reset(bool useFlowControl, uint32_t receiveInitialWindowSize, uint32_t receiveStreamWindowSize, uint32_t sendInitialWindowSize)
constexpr detail::Map< Move > move
virtual void onIngressError(const HTTPSessionBase &, ProxygenError)
bool getCurrentTransportInfoWithoutUpdate(wangle::TransportInfo *tinfo) const override
std::chrono::milliseconds getTimeoutDuration() const
virtual HTTPTransaction::Handler * getTransactionTimeoutHandler(HTTPTransaction *txn)=0
virtual std::chrono::milliseconds getGracefulShutdownTimeout() const
void flowControlTimeoutExpired() noexcept
std::chrono::milliseconds getDefaultTimeout() const
void setNewTransactionPauseState(HTTPCodec::StreamID streamID)
bool writesPaused() const
size_t initialReceiveWindow_
const uint8_t * data() const
void onConnectionSendWindowOpen() override
void setProxygenError(ProxygenError proxygenError)
virtual CodecProtocol getProtocol() const =0
size_t sendWindowUpdate(HTTPTransaction *txn, uint32_t bytes) noexceptoverride
folly::AsyncTransportWrapper::UniquePtr sock_
folly::IOBufQueue readBuf_
void readTimeoutExpired() noexcept
void setMaxConcurrentIncomingStreams(uint32_t num) override
std::unique_ptr< folly::IOBuf > move()
virtual void onIngressEOF()
HTTPSessionStats * getSessionStats() const
virtual void generatePushPromise(folly::IOBufQueue &, StreamID, const HTTPMessage &, StreamID, bool, HTTPHeaderSize *)
virtual void setupOnHeadersComplete(HTTPTransaction *txn, HTTPMessage *msg)=0
HTTP2PriorityQueue txnEgressQueue_
virtual void onCreate(const HTTPSessionBase &)
static http_parser_settings settings
bool isPrioritySampled() const
void setCodecStatusCode(ErrorCode statusCode)
uint32_t maxConcurrentIncomingStreams_
void setByteEventTracker(std::shared_ptr< ByteEventTracker > byteEventTracker)
const folly::SocketAddress & getLocalAddress() const noexceptoverride
std::string describe() const
uint8_t getPriority() const
void updateSessionBytesSheduled(uint64_t bytes)
void onNewOutgoingStream(uint32_t outgoingStreams)
requires E e noexcept(noexcept(s.error(std::move(e))))
void onIngressChunkHeader(size_t length)
uint32_t outgoingStreams_
void decrementTransactionCount(HTTPTransaction *txn, bool ingressEOM, bool egressEOM)
std::set< HTTPCodec::StreamID > getExTransactions() const
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
bool resetAfterDrainingWrites_
virtual size_t generateBody(folly::IOBufQueue &writeBuf, StreamID stream, std::unique_ptr< folly::IOBuf > chain, folly::Optional< uint8_t > padding, bool eom)=0
bool testAndClearActive()
virtual size_t generateRstStream(folly::IOBufQueue &writeBuf, StreamID stream, ErrorCode code)=0
void updatePendingWriteSize(int64_t delta)
void onSetSendWindow(uint32_t windowSize)
void onActivated(ManagedConnection &conn) override
uint32_t maxConcurrentOutgoingStreamsRemote_
virtual size_t generateTrailers(folly::IOBufQueue &writeBuf, StreamID stream, const HTTPHeaders &trailers)=0
size_t sendPing() override
void scheduleTimeout(folly::HHWheelTimer::Callback *callback, std::chrono::milliseconds timeout)
static const uint32_t kMinReadSize
bool writesUnpaused() const
FilterChain< T1, T2, FilterType, set_callback, TakeOwnership > & add(Args &&...args)
virtual void onFullHandshakeCompletion(const HTTPSessionBase &)
const folly::SocketAddress & getPeerAddress() const noexceptoverride
unsigned numActiveWrites_
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
HTTPSessionBase(const folly::SocketAddress &localAddr, const folly::SocketAddress &peerAddr, HTTPSessionController *controller, const wangle::TransportInfo &tinfo, InfoCallback *infoCallback, std::unique_ptr< HTTPCodec > codec)
virtual bool isWaitingToDrain() const =0
const std::set< HTTPCodec::StreamID > & getPushedTransactions() const
void commonEom(HTTPTransaction *txn, size_t encodedSize, bool piggybacked) noexcept
bool maybeResumePausedPipelinedTransaction(size_t oldStreamCount, uint32_t txnSeqn)
WheelTimerInstance timeout_
WriteSegmentList pendingWrites_
std::unique_ptr< ShutdownTransportCallback > shutdownTransportCb_
virtual void onIngressMessage(const HTTPSessionBase &, const HTTPMessage &)
void notifyIngressBodyProcessed(uint32_t bytes) noexceptoverride
void onSetMaxInitiatedStreams(uint32_t maxTxns)
void resumeTransactions()
void handler(int, siginfo_t *, void *)
void readEOF() noexceptoverride
void onIngressTrailers(std::unique_ptr< HTTPHeaders > trailers)
uint64_t bodyBytesPerWriteBuf_
bool getHTTP2PrioritiesEnabled() const override
ConnectionCloseReason getConnectionCloseReason() const
static void handleLastByteEvents(ByteEventTracker *byteEventTracker, HTTPTransaction *txn, size_t encodedSize, size_t byteOffset, bool piggybacked)
void runDestroyCallbacks()
HTTPCodec::StreamID getGracefulGoawayAck() const
bool initWithSocket(const folly::AsyncSocket *sock)
void onGoaway(ErrorCode code)
bool isLoopCallbackScheduled() const
constexpr auto size(C const &c) -> decltype(c.size())
bool onBodyImpl(std::unique_ptr< folly::IOBuf > chain, size_t length, uint16_t padding, HTTPTransaction *txn)
void onDeactivated(ManagedConnection &conn) override
void notifyPendingEgress() noexceptoverride
virtual void onIngressEOF()=0
void onSettingsAck() override
virtual const std::string & getUserAgent() const =0
std::chrono::milliseconds setupTime
HTTPSessionController * getController()
void setTimeoutDuration(std::chrono::milliseconds duration)
void writeBuf(const Buf &buf, folly::io::Appender &out)
virtual void onHeadersSent(const HTTPMessage &, bool)
virtual size_t generatePriority(folly::IOBufQueue &, StreamID, const HTTPMessage::HTTPPriority &)
void describe(std::ostream &os) const override
virtual bool supportsStreamFlowControl() const
int64_t pendingWriteSizeDelta_
uint64_t getNumTxnServed() const
void onWindowUpdate(HTTPCodec::StreamID stream, uint32_t amount) override
static Options cacheChainLength()
void setCloseReason(ConnectionCloseReason reason)
void onIngressWindowUpdate(uint32_t amount)
bool isIngressPaused() const
virtual void onTransactionDetached(const HTTPSessionBase &)
void incrementOutgoingStreams()
virtual void onSettingsOutgoingStreamsFull(const HTTPSessionBase &)
void cancelLoopCallback()
virtual size_t generateCertificate(folly::IOBufQueue &, uint16_t, std::unique_ptr< folly::IOBuf >)
virtual void onSettingsAck(const HTTPSessionBase &)
bool onPushedTransaction(HTTPTransaction *txn)
void handleErrorDirectly(HTTPTransaction *txn, const HTTPException &error)
virtual void generateExHeader(folly::IOBufQueue &, StreamID, const HTTPMessage &, const HTTPCodec::ExAttributes &, bool, HTTPHeaderSize *)
void onSettings(const SettingsList &settings) override
size_t sendEOM(HTTPTransaction *txn, const HTTPHeaders *trailers) noexceptoverride
bool isConnWindowFull() const
folly::IntrusiveListHook listHook
void onMessageComplete(HTTPCodec::StreamID streamID, bool upgrade) override
void setFlowControl(size_t initialReceiveWindow, size_t receiveStreamWindowSize, size_t receiveSessionWindowSize) override
const T * getUnderlyingTransport() const
HTTPTransaction * newExTransaction(HTTPTransaction::Handler *handler, HTTPCodec::StreamID controlStream, bool unidirectional=false) noexceptoverride
std::size_t length() const
HTTPTransactionEgressSM::State getEgressState() const
const std::string & getCodecProtocolString(CodecProtocol proto)
virtual void recordTTLBAIOBSplitByEom() noexcept=0
GuardImpl guard(ErrorHandler &&handler)
http2::PriorityUpdate getMessagePriority(const HTTPMessage *msg)
void onHeadersComplete(HTTPCodec::StreamID streamID, std::unique_ptr< HTTPMessage > msg) override
ErrorCode getCodecStatusCode() const
const char * getErrorCodeString(ErrorCode error)
void onConnectionSendWindowClosed() override
HTTPCodec::StreamID sendPriority(http2::PriorityUpdate pri) override
virtual size_t generateConnectionPreface(folly::IOBufQueue &)
virtual size_t generateSettingsAck(folly::IOBufQueue &)
virtual uint32_t getDefaultWindowSize() const
void errorOnTransactionIds(const std::vector< HTTPCodec::StreamID > &ids, ProxygenError err, const std::string &extraErrorMsg="")
void onAbort(HTTPCodec::StreamID streamID, ErrorCode code) override
virtual size_t generateChunkHeader(folly::IOBufQueue &writeBuf, StreamID stream, size_t length)=0
bool shouldShutdown() const
void setEgressSettings(const SettingsList &inSettings) override
void setCallback(T2 *cb) override
HTTPTransaction * newPushedTransaction(HTTPCodec::StreamID assocStreamId, HTTPTransaction::PushHandler *handler) noexceptoverride
void removePushedTransaction(HTTPCodec::StreamID pushStreamId)
virtual StreamID mapPriorityToDependency(uint8_t) const
std::chrono::microseconds rtt
folly::Optional< HTTPPriority > getHTTP2Priority() const
bool readsShutdown() const
const Handler * getHandler() const
void onChunkHeader(HTTPCodec::StreamID stream, size_t length) override
static const char *const value
virtual void recordSessionReused() noexcept=0
ConnectionManager * getConnectionManager()
uint32_t getSequenceNumber() const
bool writesShutdown() const
virtual void onPingReplySent(int64_t)
uint32_t liveTransactions_
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
folly::WriteFlags getFlags()
virtual size_t generateEOM(folly::IOBufQueue &writeBuf, StreamID stream)=0
virtual void onRead(const HTTPSessionBase &, size_t)
bool ingressLimitExceeded() const
void writeErr(size_t bytesWritten, const folly::AsyncSocketException &) noexceptoverride
void runLoopCallback() noexceptoverride
uint64_t numPendingEgress() const
void invalidStream(HTTPCodec::StreamID stream, ErrorCode code=ErrorCode::_SPDY_INVALID_STREAM)
uint32_t incomingStreams_
virtual bool extraResponseExpected() const
folly::Optional< HTTPCodec::StreamID > getControlStream() const
const PriorityUpdate DefaultPriority
uint32_t getMaxDeferredSize()
std::unique_ptr< T1 > setDestination(std::unique_ptr< T1 > destination)
std::size_t computeChainDataLength() const
std::map< HTTPCodec::StreamID, HTTPTransaction > transactions_
void readDataAvailable(size_t readSize) noexceptoverride
std::shared_ptr< std::string > sslCipher
void notifyPendingShutdown() override
size_t sendPriorityImpl(HTTPCodec::StreamID streamID, http2::PriorityUpdate pri)
std::tuple< uint32_t, bool, uint8_t > HTTPPriority
SteadyClock::time_point TimePoint
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
void readBufferAvailable(std::unique_ptr< folly::IOBuf >) noexceptoverride
HTTPSessionStats * sessionStats_
virtual void onIngressPaused(const HTTPSessionBase &)
std::vector< HTTPSetting > SettingsList
virtual StreamID getLastIncomingStreamID() const
void notifyEgressBodyBuffered(int64_t bytes) noexceptoverride
folly::IOBufQueue writeBuf_
std::list< ReplaySafetyCallback * > waitingForReplaySafety_
void getReadBuffer(void **buf, size_t *bufSize) override
void onReplaySafe() noexceptoverride
void onBody(HTTPCodec::StreamID streamID, std::unique_ptr< folly::IOBuf > chain, uint16_t padding) override
bool isSpdyCodecProtocol(CodecProtocol protocol)
virtual bool allTransactionsStarted() const =0
virtual void resetTimeout()
size_t sendChunkHeader(HTTPTransaction *txn, size_t length) noexceptoverride
void attachToSessionController()
void setSecondAuthManager(std::unique_ptr< SecondaryAuthManager > secondAuthManager)
void onPriority(HTTPCodec::StreamID stream, const HTTPMessage::HTTPPriority &) override
bool verifyCertAuthSetting(uint32_t value)
std::shared_ptr< ByteEventTracker > byteEventTracker_
void onSessionParseError(const HTTPException &error)
void onGoaway(uint64_t lastGoodStreamID, ErrorCode code, std::unique_ptr< folly::IOBuf > debugData=nullptr) override
void setEgressBytesLimit(uint64_t bytesLimit)
void setPrioritySampled(bool sampled)
virtual void onRequestEnd(const HTTPSessionBase &, uint32_t)
virtual void onSettings(const HTTPSessionBase &, const SettingsList &)
void onIngressUpgrade(UpgradeProtocol protocol)
void onIngressSetSendWindow(uint32_t newWindowSize)
std::chrono::time_point< ClockType > getCurrentTime()
uint64_t egressBytesLimit_
size_t sendChunkTerminator(HTTPTransaction *txn) noexceptoverride
const SocketAddress localAddr
void trimStart(size_t amount)
WriteSegment(HTTPSession *session, uint64_t length)
virtual bool isReusable() const =0
bool notifyBodyProcessed(uint32_t bytes)
void iterateBFS(const std::function< bool(HTTP2PriorityQueue &, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
void onEgressBufferCleared() override
void onMessageBegin(HTTPCodec::StreamID streamID, HTTPMessage *msg) override
void nextEgress(NextEgressResult &result, bool spdyMode=false)
uint32_t getAvailableSend() const
DrainTimeout drainTimeout_
void setBufferCallback(BufferCallback *cb)
void onChunkComplete(HTTPCodec::StreamID stream) override
static const folly::Optional< StreamID > NoStream
void addOrUpdatePriorityNode(HTTPCodec::StreamID id, http2::PriorityUpdate pri)
HTTPSession(const WheelTimerInstance &timeout, folly::AsyncTransportWrapper::UniquePtr sock, const folly::SocketAddress &localAddr, const folly::SocketAddress &peerAddr, HTTPSessionController *controller, std::unique_ptr< HTTPCodec > codec, const wangle::TransportInfo &tinfo, InfoCallback *infoCallback)
std::shared_ptr< std::string > appProtocol
virtual size_t generateWindowUpdate(folly::IOBufQueue &, StreamID, uint32_t)
void closeWhenIdle() override
bool isHTTP2CodecProtocol(CodecProtocol protocol)
void updateWriteBufSize(int64_t delta)
void transactionTimeout(HTTPTransaction *txn) noexceptoverride
bool isRemoteInitiated() const
virtual size_t generateSettings(folly::IOBufQueue &)
virtual size_t generateGoaway(folly::IOBufQueue &writeBuf, StreamID lastStream, ErrorCode code, std::unique_ptr< folly::IOBuf > debugData=nullptr)=0
virtual void onEgressBufferCleared(const HTTPSessionBase &)
FlowControlTimeout flowControlTimeout_
HTTPCodec::StreamID getID() const
virtual void onDeactivateConnection(const HTTPSessionBase &)
virtual void setSessionStats(HTTPSessionStats *stats)
bool ingressBytesProcessed(folly::IOBufQueue &writeBuf, uint32_t delta)
uint32_t getCertAuthSettingVal()
void detach(HTTPTransaction *txn) noexceptoverride
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
void dumpConnectionState(uint8_t loglevel) override
void postallocate(std::size_t n)
std::chrono::milliseconds sslSetupTime
bool isDownstream() const
SecondaryAuthManager * getSecondAuthManager() const
const char * what(void) const noexceptoverride
virtual bool isBusy() const =0
void updateContentionsCount(uint64_t contentions)
virtual void setParserPaused(bool paused)=0
void onTrailersComplete(HTTPCodec::StreamID streamID, std::unique_ptr< HTTPHeaders > trailers) override
void removeExTransaction(HTTPCodec::StreamID exStreamId)
static uint32_t egressBodySizeLimit_
virtual size_t generatePingRequest(folly::IOBufQueue &)
void shutdownTransportWithReset(ProxygenError errorCode, const std::string &errorMsg="")
bool isDownstream() const
std::unique_ptr< SecondaryAuthManager > secondAuthManager_
void onPushMessageBegin(HTTPCodec::StreamID streamID, HTTPCodec::StreamID assocStreamID, HTTPMessage *msg) override
virtual bool supportsParallelRequests() const =0
void onError(HTTPCodec::StreamID streamID, const HTTPException &error, bool newTxn) override
virtual void onFlowControlWindowClosed(const HTTPSessionBase &)
bool supportsMoreTransactions() const
bool hasMoreWrites() const
void pauseIngress(HTTPTransaction *txn) noexceptoverride
virtual TransportDirection getTransportDirection() const =0
const HTTPSetting * getSetting(SettingsId id) const
wangle::TransportInfo transportInfo_
uint64_t sessionByteOffset()
std::unique_ptr< folly::IOBuf > pop_front()
folly::HHWheelTimer * getWheelTimer() const
virtual size_t generateCertificateRequest(folly::IOBufQueue &, uint16_t, std::unique_ptr< folly::IOBuf >)
void onWriteSuccess(uint64_t bytesWritten)
void onExMessageBegin(HTTPCodec::StreamID streamID, HTTPCodec::StreamID controlStream, bool unidirectional, HTTPMessage *msg) override
const char * getErrorString(ProxygenError error)
void setSetting(SettingsId id, SettingsValue val)
virtual void onRequestBegin(const HTTPSessionBase &)
static const folly::Optional< ExAttributes > NoExAttributes
void onLastByteEvent(HTTPTransaction *txn, uint64_t offset, bool eomTracked) noexceptoverride