proxygen
HTTPSessionBase.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #pragma once
11 
12 #include <fizz/record/Types.h>
14 #include <folly/io/IOBuf.h>
20 
21 namespace proxygen {
22 class HTTPSessionController;
23 class HTTPSessionStats;
24 class HTTPTransaction;
25 class ByteEventTracker;
26 
28 
30 public:
31  virtual ~HTTPPriorityMapFactoryProvider() = default;
33 };
34 
36  public:
37 
38  enum class SessionType {
39  HTTP,
40  HQ
41  };
42 
47  class InfoCallback {
48  public:
49  virtual ~InfoCallback() {}
50 
51  // Note: you must not start any asynchronous work from onCreate()
52  virtual void onCreate(const HTTPSessionBase&) {}
54  virtual void onIngressError(const HTTPSessionBase&, ProxygenError) {}
55  virtual void onIngressEOF() {}
56  virtual void onRead(const HTTPSessionBase&, size_t /*bytesRead*/) {}
57  virtual void onWrite(const HTTPSessionBase&, size_t /*bytesWritten*/) {}
58  virtual void onRequestBegin(const HTTPSessionBase&) {}
59  virtual void onRequestEnd(const HTTPSessionBase&,
60  uint32_t /*maxIngressQueueSize*/) {}
61  virtual void onActivateConnection(const HTTPSessionBase&) {}
62  virtual void onDeactivateConnection(const HTTPSessionBase&) {}
63  // Note: you must not start any asynchronous work from onDestroy()
64  virtual void onDestroy(const HTTPSessionBase&) {}
65  virtual void onIngressMessage(const HTTPSessionBase&,
66  const HTTPMessage&) {}
67  virtual void onIngressLimitExceeded(const HTTPSessionBase&) {}
68  virtual void onIngressPaused(const HTTPSessionBase&) {}
69  virtual void onTransactionDetached(const HTTPSessionBase&) {}
70  virtual void onPingReplySent(int64_t /*latency*/) {}
71  virtual void onPingReplyReceived() {}
75  virtual void onEgressBuffered(const HTTPSessionBase&) {}
76  virtual void onEgressBufferCleared(const HTTPSessionBase&) {}
77  virtual void onSettings(const HTTPSessionBase&, const SettingsList&) {}
78  virtual void onSettingsAck(const HTTPSessionBase&) {}
79  };
80 
84  HTTPSessionController* controller,
85  const wangle::TransportInfo& tinfo,
86  InfoCallback* infoCallback,
87  std::unique_ptr<HTTPCodec> codec);
88 
89  virtual ~HTTPSessionBase() {}
90 
94  static void setDefaultReadBufferLimit(uint32_t limit) {
95  kDefaultReadBufLimit = limit;
96  VLOG(3) << "read buffer limit: " << int(limit / 1000) << "KB";
97  }
98 
99  static void setMaxReadBufferSize(uint32_t bytes) {
100  maxReadBufferSize_ = bytes;
101  }
102 
108  egressBodySizeLimit_ = limit;
109  }
110 
116  kDefaultWriteBufLimit = max;
117  }
118 
119  void setInfoCallback(InfoCallback* callback) {
120  infoCallback_ = callback;
121  }
122 
124  return infoCallback_;
125  }
126 
127  virtual void setSessionStats(HTTPSessionStats* stats) {
128  sessionStats_ = stats;
129  }
130 
131  virtual SessionType getType() const noexcept = 0;
132 
133  virtual folly::AsyncTransportWrapper* getTransport() = 0;
134 
135  virtual const folly::AsyncTransportWrapper* getTransport() const = 0;
136 
137  virtual folly::EventBase* getEventBase() const = 0;
138 
143  HTTPTransaction::Handler* getParseErrorHandler(
144  HTTPTransaction* txn, const HTTPException& error);
145 
146  virtual bool hasActiveTransactions() const = 0;
147 
152  return (getNumOutgoingStreams() < getMaxConcurrentOutgoingStreams());
153  }
154 
155  virtual uint32_t getNumOutgoingStreams() const = 0;
156 
157  // SimpleSessionPool
159  return historicalMaxOutgoingStreams_;
160  }
161 
162  virtual uint32_t getNumIncomingStreams() const = 0;
163 
164 
165  virtual uint32_t getMaxConcurrentOutgoingStreamsRemote() const = 0;
166 
168  return std::min(maxConcurrentOutgoingStreamsConfig_,
169  getMaxConcurrentOutgoingStreamsRemote());
170  }
171 
172  HTTPSessionController* getController() { return controller_; }
173 
175  controller_ = controller;
176 
177  // Controller controlled settings
178  initCodecHeaderIndexingStrategy();
179  }
180 
182  return closeReason_;
183  }
184 
185  template<typename Filter, typename... Args>
186  void addCodecFilter(Args&&... args) {
187  codec_.add<Filter>(std::forward<Args>(args)...);
188  }
189 
190  virtual CodecProtocol getCodecProtocol() const {
191  return codec_->getProtocol();
192  }
193 
205  virtual void setFlowControl(
206  size_t initialReceiveWindow,
207  size_t receiveStreamWindowSize,
208  size_t receiveSessionWindowSize) = 0;
209 
213  virtual void setEgressSettings(const SettingsList& inSettings) = 0;
214 
218  void setHTTP2PrioritiesEnabled(bool enabled) /*override*/ {
219  h2PrioritiesEnabled_ = enabled;
220  }
221 
222  virtual bool getHTTP2PrioritiesEnabled() const {
223  return h2PrioritiesEnabled_;
224  }
225 
232  // TODO: CHECK(started_);
233  maxConcurrentOutgoingStreamsConfig_ = num;
234  }
235 
239  virtual void setMaxConcurrentIncomingStreams(uint32_t num) = 0;
240 
246  return writeBufLimit_;
247  }
248 
250  writeBufLimit_ = limit;
251  VLOG(4) << "write buffer limit: " << int(limit / 1000) << "KB";
252  }
253 
255  readBufLimit_ = limit;
256  }
257 
263  virtual void startNow() = 0;
264 
268  virtual size_t sendSettings() = 0;
269 
275  virtual size_t sendPing() = 0;
276 
283  /*override*/ = 0;
284 
289  virtual size_t sendPriority(HTTPCodec::StreamID id,
290  http2::PriorityUpdate pri) = 0;
291 
296  virtual size_t sendCertificateRequest(
297  std::unique_ptr<folly::IOBuf> /* certificateRequestContext */,
298  std::vector<fizz::Extension> /* extensions */) {
299  return 0;
300  }
301 
303  return transactionSeqNo_;
304  }
305 
306  std::chrono::seconds getLatestIdleTime() const {
307  DCHECK_GT(transactionSeqNo_, 0) << "No idle time for the first transaction";
308  DCHECK(latestActive_ > TimePoint::min());
309  return latestIdleDuration_;
310  }
311 
312  void setPrioritySampled(bool sampled) {
313  prioritySample_ = sampled;
314  }
315 
316  // public HTTPTransaction::Transport overrides
318  const noexcept /*override*/ {
319  return localAddr_;
320  }
322  const noexcept /*override*/ {
323  return peerAddr_;
324  }
326  /*override*/ {
327  return transportInfo_;
328  }
329  virtual bool getCurrentTransportInfo(
330  wangle::TransportInfo* tinfo) /*override*/ = 0;
331 
332  virtual bool getCurrentTransportInfoWithoutUpdate(
333  wangle::TransportInfo* tinfo) const = 0;
334 
335  virtual void setHeaderCodecStats(HeaderCodec::Stats* stats) = 0;
336 
337  virtual void enableDoubleGoawayDrain() = 0;
338 
340  return transportInfo_;
341  }
342 
346  virtual bool connCloseByRemote() = 0;
347 
348  // Upstream API
349 
350  // The interfaces defined below update the virtual stream based priority
351  // scheme from the current system which allows only strict priorities to a
352  // flexible system allowing an arbitrary tree of virtual streams, subject only
353  // to the limitations in the HTTP/2 specification. An arbitrary prioritization
354  // scheme can be implemented by constructing virtual streams corresponding to
355  // the desired prioritization and attaching new streams as dependencies of the
356  // appropriate virtual stream.
357  //
358  // The user must define a map from an opaque integer priority level to an
359  // HTTP/2 priority corresponding to the virtual stream. This map is
360  // implemented by the user in a class that extends
361  // HTTPUpstreamSession::PriorityMapFactory. A shared pointer to this class is
362  // passed into the constructor of HTTPUpstreamSession. This method will send
363  // the virtual streams and return a unique pointer to a class that extends
364  // HTTPUpstreamSession::PriorityAdapter. This class implements the map between
365  // the user defined priority level and the HTTP/2 priority level.
366  //
367  // When the session is started, the createVirtualStreams method of
368  // PriorityMapFactory is called by HTTPUpstreamSession::startNow. The returned
369  // pointer to the PriorityAdapter map is cached in HTTPUpstreamSession. The
370  // HTTP/2 priority that should be used for a new stream dependent on a virtual
371  // stream corresponding to a given priority level is then retrieved by calling
372  // the HTTPUpstreamSession::getHTTPPriority(uint8_t level) method.
373  //
374  // The prior strict priority system has been left in place for now, but if
375  // both maxLevels and PriorityMapFactory are passed into the
376  // HTTPUpstreamSession constructor, the maxLevels parameter will be ignored.
377 
378  // Implments a map from generic priority level to HTTP/2 priority.
380  public:
382  getHTTPPriority(uint8_t level) = 0;
383  virtual ~PriorityAdapter() = default;
384  };
385 
387  public:
388  // Creates the map implemented by PriorityAdapter, sends the corresponding
389  // virtual stream on the given session, and retuns the map.
390  virtual std::unique_ptr<PriorityAdapter> createVirtualStreams(
391  HTTPPriorityMapFactoryProvider* session) const = 0;
392  virtual ~PriorityMapFactory() = default;
393  };
394 
395  using FilterIteratorFn = std::function<void(HTTPCodecFilter*)>;
396 
397  virtual bool isDetachable(bool checkSocket) const = 0;
398 
399  virtual void attachThreadLocals(
400  folly::EventBase* eventBase,
401  folly::SSLContextPtr sslContext,
402  const WheelTimerInstance& timeout,
403  HTTPSessionStats* stats,
404  FilterIteratorFn fn,
405  HeaderCodec::Stats* headerCodecStats,
406  HTTPSessionController* controller) = 0;
407 
408  virtual void detachThreadLocals(bool detachSSLContext=false) = 0;
409 
417  virtual HTTPTransaction* newTransaction(
419 
420  virtual bool isReplaySafe() const = 0;
421 
426  virtual bool isReusable() const = 0;
427 
431  virtual bool isClosing() const = 0;
432 
438  virtual void drain() = 0;
439 
440  virtual folly::Optional<const HTTPMessage::HTTPPriority> getHTTPPriority(
441  uint8_t level) = 0;
442 
446  void enableExHeadersSettings() noexcept;
447 
448  bool isExHeadersEnabled() noexcept {
449  return exHeadersEnabled_;
450  }
451 
452  protected:
457  void handleErrorDirectly(HTTPTransaction* txn,
458  const HTTPException& error);
459 
460  bool onBodyImpl(std::unique_ptr<folly::IOBuf> chain, size_t length,
461  uint16_t padding, HTTPTransaction* txn);
462 
463  bool notifyBodyProcessed(uint32_t bytes);
464 
466  latestActive_ = getCurrentTime();
467  }
468 
469  bool ingressLimitExceeded() const {
470  return pendingReadSize_ > readBufLimit_;
471  }
472 
476  bool egressLimitExceeded() const {
477  // Changed to >
478  return pendingWriteSize_ > writeBufLimit_;
479  }
480 
482  DCHECK(delta >= 0 || uint64_t(-delta) <= pendingWriteSize_);
483  pendingWriteSize_ += delta;
484  }
485 
487  if (transactionSeqNo_ >= 1) {
488  // idle duration only exists since the 2nd transaction in the session
489  latestIdleDuration_ = secondsSince(latestActive_);
490  }
491  }
492 
493  void incrementSeqNo() {
494  ++transactionSeqNo_;
495  }
496 
497  bool isPrioritySampled() const {
498  return prioritySample_;
499  }
500 
501  void onNewOutgoingStream(uint32_t outgoingStreams) {
502  if (outgoingStreams > historicalMaxOutgoingStreams_) {
503  historicalMaxOutgoingStreams_ = outgoingStreams;
504  }
505  }
506 
508  if (closeReason_ == ConnectionCloseReason::kMAX_REASON) {
509  closeReason_ = reason;
510  }
511  }
512 
513  static void handleLastByteEvents(
514  ByteEventTracker* byteEventTracker,
515  HTTPTransaction* txn,
516  size_t encodedSize,
517  size_t byteOffset,
518  bool piggybacked);
519 
520  void runDestroyCallbacks();
521 
522  /*
523  * Invoked by children upon updating the actual codec wrapped by the filter
524  * chain.
525  */
526  void onCodecChanged();
527 
534  void initCodecHeaderIndexingStrategy();
535 
539  void attachToSessionController();
540 
541  HTTPSessionStats* sessionStats_{nullptr};
542 
543  InfoCallback* infoCallback_{nullptr}; // maybe can move to protected
544 
546 
548 
554 
559 
570 
573 
576 
577  private:
578  // Underlying controller_ is marked as private so that callers must utilize
579  // getController/setController protected methods. This ensures we have a
580  // single path to update controller_
581  HTTPSessionController* controller_{nullptr};
582 
583  // private ManagedConnection methods
584  std::chrono::milliseconds getIdleTime() const override {
585  if (timePointInitialized(latestActive_)) {
586  return millisecondsSince(latestActive_);
587  } else {
588  return std::chrono::milliseconds(0);
589  }
590  }
591 
595  TimePoint latestActive_{};
596 
600  std::chrono::seconds latestIdleDuration_{};
601 
603  uint32_t transactionSeqNo_{0};
604 
608  ConnectionCloseReason closeReason_
610 
614  uint32_t historicalMaxOutgoingStreams_{0};
615 
620  uint32_t maxConcurrentOutgoingStreamsConfig_{
621  kDefaultMaxConcurrentOutgoingStreams};
622 
623 
630  uint32_t readBufLimit_{kDefaultReadBufLimit};
631  uint32_t writeBufLimit_{kDefaultWriteBufLimit};
632 
637  uint64_t pendingWriteSize_{0};
638 
643  uint32_t pendingReadSize_{0};
644 
645  bool prioritySample_:1;
646  bool h2PrioritiesEnabled_:1;
647 
651  bool exHeadersEnabled_:1;
652 };
653 
654 }
ConnectionCloseReason
Definition: HTTPConstants.h:51
static uint32_t kDefaultReadBufLimit
bool egressLimitExceeded() const
void setWriteBufferLimit(uint32_t limit)
virtual void onActivateConnection(const HTTPSessionBase &)
folly::SocketAddress localAddr_
virtual void onIngressLimitExceeded(const HTTPSessionBase &)
HTTPCodecFilterChain codec_
void setReadBufferLimit(uint32_t limit)
virtual void onWrite(const HTTPSessionBase &, size_t)
LogLevel max
Definition: LogLevel.cpp:31
const SocketAddress peerAddr
Definition: TestUtils.cpp:20
static void setFlowControlledBodySizeLimit(uint32_t limit)
virtual void onSettingsOutgoingStreamsNotFull(const HTTPSessionBase &)
virtual void onEgressBuffered(const HTTPSessionBase &)
void setMaxConcurrentOutgoingStreams(uint32_t num)
static uint32_t maxReadBufferSize_
virtual void onIngressError(const HTTPSessionBase &, ProxygenError)
EventBase * getEventBase()
CodecFactory codec
virtual void onCreate(const HTTPSessionBase &)
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
std::unique_ptr< Codec > codec_
void onNewOutgoingStream(uint32_t outgoingStreams)
requires E e noexcept(noexcept(s.error(std::move(e))))
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
Definition: error.h:48
void updatePendingWriteSize(int64_t delta)
static void setDefaultReadBufferLimit(uint32_t limit)
virtual void onFullHandshakeCompletion(const HTTPSessionBase &)
virtual void onIngressMessage(const HTTPSessionBase &, const HTTPMessage &)
static void setMaxReadBufferSize(uint32_t bytes)
void handler(int, siginfo_t *, void *)
ConnectionCloseReason getConnectionCloseReason() const
std::function< void(HTTPCodecFilter *)> FilterIteratorFn
HTTPSessionController * getController()
virtual bool getHTTP2PrioritiesEnabled() const
void addCodecFilter(Args &&...args)
uint32_t getMaxConcurrentOutgoingStreams() const
virtual ~HTTPPriorityMapFactoryProvider()=default
uint32_t getWriteBufferLimit() const
LogLevel min
Definition: LogLevel.cpp:30
uint64_t getNumTxnServed() const
void setCloseReason(ConnectionCloseReason reason)
virtual void onTransactionDetached(const HTTPSessionBase &)
const wangle::TransportInfo & getSetupTransportInfo() const noexcept
virtual void onSettingsOutgoingStreamsFull(const HTTPSessionBase &)
virtual void onSettingsAck(const HTTPSessionBase &)
std::shared_ptr< SSLContext > SSLContextPtr
Definition: SSLContext.h:628
void setPrioritySampled(bool sampled)
void setHTTP2PrioritiesEnabled(bool enabled)
virtual CodecProtocol getCodecProtocol() const
virtual void onRead(const HTTPSessionBase &, size_t)
bool ingressLimitExceeded() const
uint32_t getHistoricalMaxOutgoingStreams() const
SteadyClock::time_point TimePoint
Definition: Time.h:25
virtual void onIngressPaused(const HTTPSessionBase &)
std::vector< HTTPSetting > SettingsList
Definition: HTTPSettings.h:81
std::chrono::milliseconds millisecondsSince(std::chrono::time_point< ClockType > t)
Definition: Time.h:101
void setController(HTTPSessionController *controller)
std::chrono::seconds getLatestIdleTime() const
virtual void onRequestEnd(const HTTPSessionBase &, uint32_t)
virtual void onSettings(const HTTPSessionBase &, const SettingsList &)
const
Definition: upload.py:398
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
const SocketAddress localAddr
Definition: TestUtils.cpp:19
InfoCallback * getInfoCallback() const
uint64_t StreamID
Definition: HTTPCodec.h:49
constexpr uint32_t kDefaultMaxConcurrentOutgoingStreams
void setInfoCallback(InfoCallback *callback)
static void setDefaultWriteBufferLimit(uint32_t max)
virtual HTTPCodec::StreamID sendPriority(http2::PriorityUpdate pri)=0
folly::SocketAddress peerAddr_
virtual void onEgressBufferCleared(const HTTPSessionBase &)
static uint32_t kDefaultWriteBufLimit
virtual void onDeactivateConnection(const HTTPSessionBase &)
virtual void onDestroy(const HTTPSessionBase &)
virtual void setSessionStats(HTTPSessionStats *stats)
const folly::SocketAddress & getPeerAddress() const noexcept
const folly::SocketAddress & getLocalAddress() const noexcept
wangle::TransportInfo & getSetupTransportInfo() noexcept
bool timePointInitialized(const T &time)
Definition: Time.h:35
static uint32_t egressBodySizeLimit_
virtual size_t sendCertificateRequest(std::unique_ptr< folly::IOBuf >, std::vector< fizz::Extension >)
virtual void onFlowControlWindowClosed(const HTTPSessionBase &)
bool supportsMoreTransactions() const
std::chrono::milliseconds getIdleTime() const override
wangle::TransportInfo transportInfo_
std::chrono::seconds secondsSince(std::chrono::time_point< ClockType > t)
Definition: Time.h:107
virtual void onRequestBegin(const HTTPSessionBase &)