proxygen
AsyncSocket.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <folly/Optional.h>
19 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
29 
30 #include <sys/types.h>
31 
32 #include <chrono>
33 #include <map>
34 #include <memory>
35 
36 namespace folly {
37 
66 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS
67 #define SO_NO_TRANSPARENT_TLS 200
68 #endif
69 
70 #if defined __linux__ && !defined SO_NO_TSOCKS
71 #define SO_NO_TSOCKS 201
72 #endif
73 
74 #ifdef _MSC_VER
75 // We do a dynamic_cast on this, in
76 // AsyncTransportWrapper::getUnderlyingTransport so be safe and
77 // force displacements for it. See:
78 // https://msdn.microsoft.com/en-us/library/7sf3txa8.aspx
79 #pragma vtordisp(push, 2)
80 #endif
81 class AsyncSocket : virtual public AsyncTransportWrapper {
82  public:
83  typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
84 
86  public:
87  virtual ~ConnectCallback() = default;
88 
93  virtual void connectSuccess() noexcept = 0;
94 
100  virtual void connectErr(const AsyncSocketException& ex) noexcept = 0;
101  };
102 
104  public:
105  virtual ~EvbChangeCallback() = default;
106 
107  // Called when the socket has been attached to a new EVB
108  // and is called from within that EVB thread
109  virtual void evbAttached(AsyncSocket* socket) = 0;
110 
111  // Called when the socket is detached from an EVB and
112  // is called from the EVB thread being detached
113  virtual void evbDetached(AsyncSocket* socket) = 0;
114  };
115 
121  public:
122  virtual ~ErrMessageCallback() = default;
123 
132  virtual void errMessage(const cmsghdr& cmsg) noexcept = 0;
133 
140  virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
141  };
142 
144  public:
145  virtual ~SendMsgParamsCallback() = default;
146 
156  int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
157  return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
158  }
159 
170  virtual void getAncillaryData(
171  folly::WriteFlags /*flags*/,
172  void* /*data*/) noexcept {}
173 
181  folly::WriteFlags /*flags*/) noexcept {
182  return 0;
183  }
184 
185  static const size_t maxAncillaryDataSize{0x5000};
186 
187  private:
201  virtual int getFlagsImpl(folly::WriteFlags /*flags*/, int defaultFlags) {
202  return defaultFlags;
203  }
204 
211  int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
212  };
213 
214  explicit AsyncSocket();
220  explicit AsyncSocket(EventBase* evb);
221 
222  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
223 
232  AsyncSocket(
233  EventBase* evb,
234  const folly::SocketAddress& address,
235  uint32_t connectTimeout = 0);
236 
246  AsyncSocket(
247  EventBase* evb,
248  const std::string& ip,
249  uint16_t port,
250  uint32_t connectTimeout = 0);
251 
265  AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId = 0);
266 
274 
281  static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb) {
282  return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb), Destructor());
283  }
284 
288  static std::shared_ptr<AsyncSocket> newSocket(
289  EventBase* evb,
290  const folly::SocketAddress& address,
291  uint32_t connectTimeout = 0) {
292  return std::shared_ptr<AsyncSocket>(
293  new AsyncSocket(evb, address, connectTimeout), Destructor());
294  }
295 
299  static std::shared_ptr<AsyncSocket> newSocket(
300  EventBase* evb,
301  const std::string& ip,
302  uint16_t port,
303  uint32_t connectTimeout = 0) {
304  return std::shared_ptr<AsyncSocket>(
305  new AsyncSocket(evb, ip, port, connectTimeout), Destructor());
306  }
307 
311  static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb, int fd) {
312  return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb, fd), Destructor());
313  }
314 
323  void destroy() override;
324 
328  EventBase* getEventBase() const override {
329  return eventBase_;
330  }
331 
335  virtual int getFd() const {
336  return fd_;
337  }
338 
353  virtual int detachFd();
354 
360  class OptionKey {
361  public:
362  bool operator<(const OptionKey& other) const {
363  if (level == other.level) {
364  return optname < other.optname;
365  }
366  return level < other.level;
367  }
368  int apply(int fd, int val) const {
369  return setsockopt(fd, level, optname, &val, sizeof(val));
370  }
371  int level;
372  int optname;
373  };
374 
375  // Maps from a socket option key to its value
376  typedef std::map<OptionKey, int> OptionMap;
377 
378  static const OptionMap emptyOptionMap;
379  static const folly::SocketAddress& anyAddress();
380 
391  virtual void connect(
392  ConnectCallback* callback,
393  const folly::SocketAddress& address,
394  int timeout = 0,
395  const OptionMap& options = emptyOptionMap,
396  const folly::SocketAddress& bindAddr = anyAddress()) noexcept;
397 
398  void connect(
399  ConnectCallback* callback,
400  const std::string& ip,
401  uint16_t port,
402  int timeout = 0,
403  const OptionMap& options = emptyOptionMap) noexcept;
404 
412  void cancelConnect();
413 
429  void setSendTimeout(uint32_t milliseconds) override;
430 
437  uint32_t getSendTimeout() const override {
438  return sendTimeout_;
439  }
440 
451  void setMaxReadsPerEvent(uint16_t maxReads) {
452  maxReadsPerEvent_ = maxReads;
453  }
454 
464  return maxReadsPerEvent_;
465  }
466 
476  virtual void setErrMessageCB(ErrMessageCallback* callback);
477 
483  virtual ErrMessageCallback* getErrMessageCallback() const;
484 
490  virtual void setSendMsgParamCB(SendMsgParamsCallback* callback);
491 
497  virtual SendMsgParamsCallback* getSendMsgParamsCB() const;
498 
499  // Read and write methods
500  void setReadCB(ReadCallback* callback) override;
501  ReadCallback* getReadCallback() const override;
502 
503  bool setZeroCopy(bool enable);
504  bool getZeroCopy() const {
505  return zeroCopyEnabled_;
506  }
507 
509  return zeroCopyBufId_;
510  }
511 
514  }
515 
516  void setZeroCopyReenableThreshold(size_t threshold);
517 
518  void write(
519  WriteCallback* callback,
520  const void* buf,
521  size_t bytes,
522  WriteFlags flags = WriteFlags::NONE) override;
523  void writev(
524  WriteCallback* callback,
525  const iovec* vec,
526  size_t count,
527  WriteFlags flags = WriteFlags::NONE) override;
528  void writeChain(
529  WriteCallback* callback,
530  std::unique_ptr<folly::IOBuf>&& buf,
531  WriteFlags flags = WriteFlags::NONE) override;
532 
533  class WriteRequest;
534  virtual void writeRequest(WriteRequest* req);
536  handleWrite();
537  }
538 
539  // Methods inherited from AsyncTransport
540  void close() override;
541  void closeNow() override;
542  void closeWithReset() override;
543  void shutdownWrite() override;
544  void shutdownWriteNow() override;
545 
546  bool readable() const override;
547  bool writable() const override;
548  bool isPending() const override;
549  virtual bool hangup() const;
550  bool good() const override;
551  bool error() const override;
552  void attachEventBase(EventBase* eventBase) override;
553  void detachEventBase() override;
554  bool isDetachable() const override;
555 
556  void getLocalAddress(folly::SocketAddress* address) const override;
557  void getPeerAddress(folly::SocketAddress* address) const override;
558 
559  bool isEorTrackingEnabled() const override {
560  return trackEor_;
561  }
562 
563  void setEorTracking(bool track) override {
564  trackEor_ = track;
565  }
566 
567  bool connecting() const override {
568  return (state_ == StateEnum::CONNECTING);
569  }
570 
571  virtual bool isClosedByPeer() const {
572  return (
574  (readErr_ == READ_EOF || readErr_ == READ_ERROR));
575  }
576 
577  virtual bool isClosedBySelf() const {
578  return (
580  (readErr_ != READ_EOF && readErr_ != READ_ERROR));
581  }
582 
583  size_t getAppBytesWritten() const override {
584  return appBytesWritten_;
585  }
586 
587  size_t getRawBytesWritten() const override {
588  return getAppBytesWritten();
589  }
590 
591  size_t getAppBytesReceived() const override {
592  return appBytesReceived_;
593  }
594 
595  size_t getRawBytesReceived() const override {
596  return getAppBytesReceived();
597  }
598 
599  std::chrono::nanoseconds getConnectTime() const {
601  }
602 
603  std::chrono::milliseconds getConnectTimeout() const {
604  return connectTimeout_;
605  }
606 
607  std::chrono::steady_clock::time_point getConnectStartTime() const {
608  return connectStartTime_;
609  }
610 
611  std::chrono::steady_clock::time_point getConnectEndTime() const {
612  return connectEndTime_;
613  }
614 
615  bool getTFOAttempted() const {
616  return tfoAttempted_;
617  }
618 
625  bool getTFOFinished() const {
626  return tfoFinished_;
627  }
628 
641  bool getTFOSucceded() const;
642 
643  // Methods controlling socket options
644 
660  int setNoDelay(bool noDelay);
661 
666  void setCloseOnExec();
667 
668  /*
669  * Set the Flavor of Congestion Control to be used for this Socket
670  * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
671  * first to make sure the module is available for plugging in
672  * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control
673  */
674  int setCongestionFlavor(const std::string& cname);
675 
676  /*
677  * Forces ACKs to be sent immediately
678  *
679  * @return Returns 0 if the TCP_QUICKACK flag was successfully updated,
680  * or a non-zero errno value on error.
681  */
682  int setQuickAck(bool quickack);
683 
687  int setSendBufSize(size_t bufsize);
688 
692  int setRecvBufSize(size_t bufsize);
693 
698 #define SO_SET_NAMESPACE 41
699  int setTCPProfile(int profd);
700 
713  template <typename T>
714  int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) {
715  return getsockopt(fd_, level, optname, (void*)optval, optlen);
716  }
717 
726  template <typename T>
727  int setSockOpt(int level, int optname, const T* optval) {
728  return setsockopt(fd_, level, optname, optval, sizeof(T));
729  }
730 
742  virtual int
743  getSockOptVirtual(int level, int optname, void* optval, socklen_t* optlen) {
744  return getsockopt(fd_, level, optname, optval, optlen);
745  }
746 
758  virtual int setSockOptVirtual(
759  int level,
760  int optname,
761  void const* optval,
762  socklen_t optlen) {
763  return setsockopt(fd_, level, optname, optval, optlen);
764  }
765 
770  virtual void setPreReceivedData(std::unique_ptr<IOBuf> data) {
771  if (preReceivedData_) {
772  preReceivedData_->prependChain(std::move(data));
773  } else {
774  preReceivedData_ = std::move(data);
775  }
776  }
777 
782  void enableTFO() {
783  // No-op if folly does not allow tfo
784 #if FOLLY_ALLOW_TFO
785  tfoEnabled_ = true;
786 #endif
787  }
788 
790  noTransparentTls_ = true;
791  }
792 
793  void disableTSocks() {
794  noTSocks_ = true;
795  }
796 
797  enum class StateEnum : uint8_t {
798  UNINIT,
799  CONNECTING,
800  ESTABLISHED,
801  CLOSED,
802  ERROR,
803  FAST_OPEN,
804  };
805 
807 
808  // Callers should set this prior to connecting the socket for the safest
809  // behavior.
810  void setEvbChangedCallback(std::unique_ptr<EvbChangeCallback> cb) {
811  evbChangeCb_ = std::move(cb);
812  }
813 
819  void cacheAddresses();
820 
825  bool isZeroCopyWriteInProgress() const noexcept;
826 
832 
834  std::unique_ptr<const AsyncTransportCertificate> cert) {
835  peerCertData_ = std::move(cert);
836  }
838  return peerCertData_.get();
839  }
840 
842  std::unique_ptr<const AsyncTransportCertificate> cert) {
843  selfCertData_ = std::move(cert);
844  }
845 
847  return selfCertData_.get();
848  }
849 
857  struct WriteResult {
858  explicit WriteResult(ssize_t ret) : writeReturn(ret) {}
859 
860  WriteResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
861  : writeReturn(ret), exception(std::move(e)) {}
862 
863  ssize_t writeReturn;
864  std::unique_ptr<const AsyncSocketException> exception;
865  };
866 
875  struct ReadResult {
876  explicit ReadResult(ssize_t ret) : readReturn(ret) {}
877 
878  ReadResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
879  : readReturn(ret), exception(std::move(e)) {}
880 
881  ssize_t readReturn;
882  std::unique_ptr<const AsyncSocketException> exception;
883  };
884 
888  class WriteRequest {
889  public:
891  : socket_(socket), callback_(callback) {}
892 
893  virtual void start() {}
894 
895  virtual void destroy() = 0;
896 
897  virtual WriteResult performWrite() = 0;
898 
899  virtual void consume() = 0;
900 
901  virtual bool isComplete() = 0;
902 
904  return next_;
905  }
906 
908  return callback_;
909  }
910 
912  return totalBytesWritten_;
913  }
914 
916  assert(next_ == nullptr);
917  next_ = next;
918  }
919 
920  void fail(const char* fn, const AsyncSocketException& ex) {
921  socket_->failWrite(fn, ex);
922  }
923 
924  void bytesWritten(size_t count) {
925  totalBytesWritten_ += uint32_t(count);
926  socket_->appBytesWritten_ += count;
927  }
928 
929  protected:
930  // protected destructor, to ensure callers use destroy()
931  virtual ~WriteRequest() {}
932 
934  WriteRequest* next_{nullptr};
936  uint32_t totalBytesWritten_{0};
937  };
938 
939  protected:
941  READ_EOF = 0,
945  };
946 
949  };
950 
958  ~AsyncSocket() override;
959 
960  friend std::ostream& operator<<(std::ostream& os, const StateEnum& state);
961 
966  SHUT_WRITE = 0x02,
980  SHUT_READ = 0x04,
981  };
982 
983  class BytesWriteRequest;
984 
985  class WriteTimeout : public AsyncTimeout {
986  public:
988  : AsyncTimeout(eventBase), socket_(socket) {}
989 
990  void timeoutExpired() noexcept override {
991  socket_->timeoutExpired();
992  }
993 
994  private:
996  };
997 
998  class IoHandler : public EventHandler {
999  public:
1001  : EventHandler(eventBase, -1), socket_(socket) {}
1002  IoHandler(AsyncSocket* socket, EventBase* eventBase, int fd)
1003  : EventHandler(eventBase, fd), socket_(socket) {}
1004 
1005  void handlerReady(uint16_t events) noexcept override {
1006  socket_->ioReady(events);
1007  }
1008 
1009  private:
1011  };
1012 
1013  void init();
1014 
1016  public:
1017  explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {}
1018  void runLoopCallback() noexcept override {
1020  socket_->checkForImmediateRead();
1021  }
1022 
1023  private:
1025  };
1026 
1031  void scheduleImmediateRead() noexcept {
1032  if (good()) {
1034  }
1035  }
1036 
1040  void scheduleInitialReadWrite() noexcept {
1041  if (good()) {
1042  DestructorGuard dg(this);
1043  eventBase_->runInLoop([this, dg] {
1044  if (good()) {
1046  }
1047  });
1048  }
1049  }
1050 
1051  // event notification methods
1052  void ioReady(uint16_t events) noexcept;
1053  virtual void checkForImmediateRead() noexcept;
1054  virtual void handleInitialReadWrite() noexcept;
1055  virtual void prepareReadBuffer(void** buf, size_t* buflen);
1056  virtual size_t handleErrMessages() noexcept;
1057  virtual void handleRead() noexcept;
1058  virtual void handleWrite() noexcept;
1059  virtual void handleConnect() noexcept;
1060  void timeoutExpired() noexcept;
1061 
1070  virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset);
1071 
1081  void writeChainImpl(
1082  WriteCallback* callback,
1083  iovec* vec,
1084  size_t count,
1085  std::unique_ptr<folly::IOBuf>&& buf,
1086  WriteFlags flags);
1087 
1105  void writeImpl(
1106  WriteCallback* callback,
1107  const iovec* vec,
1108  size_t count,
1109  std::unique_ptr<folly::IOBuf>&& buf,
1110  WriteFlags flags = WriteFlags::NONE);
1111 
1127  virtual WriteResult performWrite(
1128  const iovec* vec,
1129  uint32_t count,
1130  WriteFlags flags,
1131  uint32_t* countWritten,
1132  uint32_t* partialWritten);
1133 
1141  sendSocketMessage(int fd, struct msghdr* msg, int msg_flags);
1142 
1143  virtual ssize_t tfoSendMsg(int fd, struct msghdr* msg, int msg_flags);
1144 
1145  int socketConnect(const struct sockaddr* addr, socklen_t len);
1146 
1147  virtual void scheduleConnectTimeout();
1148  void registerForConnectEvents();
1149 
1150  bool updateEventRegistration();
1151 
1163  bool updateEventRegistration(uint16_t enable, uint16_t disable);
1164 
1165  // Actually close the file descriptor and set it to -1 so we don't
1166  // accidentally close it again.
1167  void doClose();
1168 
1169  // error handling methods
1170  void startFail();
1171  void finishFail();
1172  void finishFail(const AsyncSocketException& ex);
1173  void invokeAllErrors(const AsyncSocketException& ex);
1174  void fail(const char* fn, const AsyncSocketException& ex);
1175  void failConnect(const char* fn, const AsyncSocketException& ex);
1176  void failRead(const char* fn, const AsyncSocketException& ex);
1177  void failErrMessageRead(const char* fn, const AsyncSocketException& ex);
1178  void failWrite(
1179  const char* fn,
1180  WriteCallback* callback,
1181  size_t bytesWritten,
1182  const AsyncSocketException& ex);
1183  void failWrite(const char* fn, const AsyncSocketException& ex);
1184  void failAllWrites(const AsyncSocketException& ex);
1185  virtual void invokeConnectErr(const AsyncSocketException& ex);
1186  virtual void invokeConnectSuccess();
1187  void invalidState(ConnectCallback* callback);
1188  void invalidState(ErrMessageCallback* callback);
1189  void invalidState(ReadCallback* callback);
1190  void invalidState(WriteCallback* callback);
1191 
1193 
1194  void cacheLocalAddress() const;
1195  void cachePeerAddress() const;
1196 
1197  bool isZeroCopyRequest(WriteFlags flags);
1198 
1199  bool isZeroCopyMsg(const cmsghdr& cmsg) const;
1200  void processZeroCopyMsg(const cmsghdr& cmsg);
1201 
1203  return zeroCopyBufId_++;
1204  }
1206  void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
1208  void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
1209  bool containsZeroCopyBuf(folly::IOBuf* ptr);
1210  void releaseZeroCopyBuf(uint32_t id);
1211 
1212  // a folly::IOBuf can be used in multiple partial requests
1213  // there is a that maps a buffer id to a raw folly::IOBuf ptr
1214  // and another one that adds a ref count for a folly::IOBuf that is either
1215  // the original ptr or nullptr
1217 
1218  struct IOBufInfo {
1219  uint32_t count_{0};
1220  std::unique_ptr<folly::IOBuf> buf_;
1221  };
1222 
1223  std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
1224  std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
1225 
1229  int fd_;
1235 
1236  bool isBufferMovable_{false};
1237 
1239 
1244 
1252  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
1255 
1256  // Pre-received data, to be returned to read callback before any data from the
1257  // socket.
1258  std::unique_ptr<IOBuf> preReceivedData_;
1259 
1260  std::chrono::steady_clock::time_point connectStartTime_;
1261  std::chrono::steady_clock::time_point connectEndTime_;
1262 
1263  std::chrono::milliseconds connectTimeout_{0};
1264 
1265  std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
1266 
1268  bool tfoEnabled_{false};
1269  bool tfoAttempted_{false};
1270  bool tfoFinished_{false};
1271  bool noTransparentTls_{false};
1272  bool noTSocks_{false};
1273  // Whether to track EOR or not.
1274  bool trackEor_{false};
1275  bool zeroCopyEnabled_{false};
1276  bool zeroCopyVal_{false};
1277  // zerocopy reenable logic
1280 
1281  // subclasses may cache these on first call to get
1282  mutable std::unique_ptr<const AsyncTransportCertificate> peerCertData_{
1283  nullptr};
1284  mutable std::unique_ptr<const AsyncTransportCertificate> selfCertData_{
1285  nullptr};
1286 };
1287 #ifdef _MSC_VER
1288 #pragma vtordisp(pop)
1289 #endif
1290 
1291 } // namespace folly
void setZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void scheduleImmediateRead() noexcept
Definition: AsyncSocket.h:1031
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wSS)
bool getTFOSucceded() const
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, const folly::SocketAddress &address, uint32_t connectTimeout=0)
Definition: AsyncSocket.h:288
void * ptr
virtual ssize_t tfoSendMsg(int fd, struct msghdr *msg, int msg_flags)
friend std::ostream & operator<<(std::ostream &os, const StateEnum &state)
std::unordered_map< uint32_t, folly::IOBuf * > idZeroCopyBufPtrMap_
Definition: AsyncSocket.h:1223
std::chrono::nanoseconds getConnectTime() const
Definition: AsyncSocket.h:599
flags
Definition: http_parser.h:127
virtual int detachFd()
bool connecting() const override
Definition: AsyncSocket.h:567
void shutdownWriteNow() override
virtual bool hangup() const
virtual void connectSuccess() noexcept=0
std::chrono::milliseconds getConnectTimeout() const
Definition: AsyncSocket.h:603
std::string withAddr(const std::string &s)
int setSockOpt(int level, int optname, const T *optval)
Definition: AsyncSocket.h:727
std::chrono::steady_clock::time_point getConnectStartTime() const
Definition: AsyncSocket.h:607
int setTCPProfile(int profd)
size_t getZeroCopyReenableThreshold() const
Definition: AsyncSocket.h:512
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, int fd)
Definition: AsyncSocket.h:311
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::unique_ptr< const AsyncSocketException > exception
Definition: AsyncSocket.h:882
bool isZeroCopyRequest(WriteFlags flags)
bool containsZeroCopyBuf(folly::IOBuf *ptr)
~AsyncSocket() override
WriteRequest(AsyncSocket *socket, WriteCallback *callback)
Definition: AsyncSocket.h:890
bool writable() const override
virtual bool isClosedByPeer() const
Definition: AsyncSocket.h:571
bool processZeroCopyWriteInProgress() noexcept
uint32_t getSendTimeout() const override
Definition: AsyncSocket.h:437
virtual WriteResult performWrite(const iovec *vec, uint32_t count, WriteFlags flags, uint32_t *countWritten, uint32_t *partialWritten)
void writev(WriteCallback *callback, const iovec *vec, size_t count, WriteFlags flags=WriteFlags::NONE) override
void cacheLocalAddress() const
std::unique_ptr< const AsyncTransportCertificate > peerCertData_
Definition: AsyncSocket.h:1282
int setSendBufSize(size_t bufsize)
void invokeAllErrors(const AsyncSocketException &ex)
bool isZeroCopyWriteInProgress() const noexcept
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
uint32_t sendTimeout_
The send timeout, in milliseconds.
Definition: AsyncSocket.h:1233
virtual void connectErr(const AsyncSocketException &ex) noexcept=0
virtual void handleConnect() noexcept
IoHandler(AsyncSocket *socket, EventBase *eventBase)
Definition: AsyncSocket.h:1000
void fail(const char *fn, const AsyncSocketException &ex)
Definition: AsyncSocket.h:920
STL namespace.
std::unique_ptr< EvbChangeCallback > evbChangeCb_
Definition: AsyncSocket.h:1265
void cachePeerAddress() const
double val
Definition: String.cpp:273
const AsyncTransportCertificate * getPeerCertificate() const override
Definition: AsyncSocket.h:837
size_t getRawBytesWritten() const override
Definition: AsyncSocket.h:587
shutdownWrite() called, but we are still waiting on writes to drain
Definition: AsyncSocket.h:964
virtual void setErrMessageCB(ErrMessageCallback *callback)
folly::std T
bool isZeroCopyMsg(const cmsghdr &cmsg) const
bool getTFOFinished() const
Definition: AsyncSocket.h:625
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
EventBase * eventBase_
The EventBase.
Definition: AsyncSocket.h:1240
folly::SocketAddress addr_
The address we tried to connect to.
Definition: AsyncSocket.h:1230
std::unique_ptr< const AsyncSocketException > exception
Definition: AsyncSocket.h:864
int8_t readErr_
The read error encountered, if any.
Definition: AsyncSocket.h:1238
void failErrMessageRead(const char *fn, const AsyncSocketException &ex)
requires E e noexcept(noexcept(s.error(std::move(e))))
WriteRequest * getNext() const
Definition: AsyncSocket.h:903
virtual void prepareReadBuffer(void **buf, size_t *buflen)
void handlerReady(uint16_t events) noexceptoverride
Definition: AsyncSocket.h:1005
std::chrono::steady_clock::time_point connectStartTime_
Definition: AsyncSocket.h:1260
uint16_t getMaxReadsPerEvent() const
Definition: AsyncSocket.h:463
void closeNow() override
AsyncSocket * socket_
parent socket
Definition: AsyncSocket.h:933
virtual void setSendMsgParamCB(SendMsgParamsCallback *callback)
bool updateEventRegistration()
void scheduleInitialReadWrite() noexcept
Definition: AsyncSocket.h:1040
WriteRequest * writeReqTail_
End of WriteRequest chain.
Definition: AsyncSocket.h:1251
WriteRequest * writeReqHead_
Chain of WriteRequests.
Definition: AsyncSocket.h:1250
bool error() const override
void failAllWrites(const AsyncSocketException &ex)
uint8_t shutdownFlags_
Shutdown state (ShutdownFlags)
Definition: AsyncSocket.h:1227
void writeChainImpl(WriteCallback *callback, iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags)
SendMsgParamsCallback * sendMsgParamCallback_
< Callback for retrieving
Definition: AsyncSocket.h:1248
int setRecvBufSize(size_t bufsize)
bool getTFOAttempted() const
Definition: AsyncSocket.h:615
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds connectTimeout_
Definition: AsyncSocket.h:1263
void setPeerCertificate(std::unique_ptr< const AsyncTransportCertificate > cert)
Definition: AsyncSocket.h:833
void fail(const char *fn, const AsyncSocketException &ex)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
Definition: AsyncSocket.h:1252
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
void attachEventBase(EventBase *eventBase) override
const AsyncTransportCertificate * getSelfCertificate() const override
Definition: AsyncSocket.h:846
size_t zeroCopyReenableCounter_
Definition: AsyncSocket.h:1279
void writeChain(WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
WriteResult(ssize_t ret, std::unique_ptr< const AsyncSocketException > e)
Definition: AsyncSocket.h:860
virtual void handleRead() noexcept
void registerForConnectEvents()
void setMaxReadsPerEvent(uint16_t maxReads)
Definition: AsyncSocket.h:451
virtual void writeRequest(WriteRequest *req)
void runLoopCallback() noexceptoverride
Definition: AsyncSocket.h:1018
bool setZeroCopy(bool enable)
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
ImmediateReadCB immediateReadHandler_
LoopCallback for checking read.
Definition: AsyncSocket.h:1243
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
folly::SocketAddress localAddr_
The address we are connecting from.
Definition: AsyncSocket.h:1231
void invalidState(ConnectCallback *callback)
void append(WriteRequest *next)
Definition: AsyncSocket.h:915
void destroy() override
void processZeroCopyMsg(const cmsghdr &cmsg)
void ioReady(uint16_t events) noexcept
size_t appBytesReceived_
Num of bytes received from socket.
Definition: AsyncSocket.h:1253
SocketAddress getPeerAddress() const
static const folly::SocketAddress & anyAddress()
IoHandler ioHandler_
A EventHandler to monitor the fd.
Definition: AsyncSocket.h:1242
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
virtual void getAncillaryData(folly::WriteFlags, void *) noexcept
Definition: AsyncSocket.h:170
void setEvbChangedCallback(std::unique_ptr< EvbChangeCallback > cb)
Definition: AsyncSocket.h:810
void detachEventBase() override
virtual int getFlagsImpl(folly::WriteFlags, int defaultFlags)
Definition: AsyncSocket.h:201
AsyncServerSocket::UniquePtr socket_
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
uint16_t eventFlags_
EventBase::HandlerFlags settings.
Definition: AsyncSocket.h:1228
void failRead(const char *fn, const AsyncSocketException &ex)
void setEorTracking(bool track) override
Definition: AsyncSocket.h:563
ImmediateReadCB(AsyncSocket *socket)
Definition: AsyncSocket.h:1017
bool good() const override
void writeRequestReady()
Definition: AsyncSocket.h:535
std::unique_ptr< const AsyncTransportCertificate > selfCertData_
Definition: AsyncSocket.h:1284
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
size_t zeroCopyReenableThreshold_
Definition: AsyncSocket.h:1278
void timeoutExpired() noexcept
void closeWithReset() override
virtual ErrMessageCallback * getErrMessageCallback() const
static const OptionMap emptyOptionMap
Definition: AsyncSocket.h:378
ErrMessageCallback * errMessageCallback_
TimestampCallback.
Definition: AsyncSocket.h:1246
WriteTimeout writeTimeout_
A timeout for connect and write.
Definition: AsyncSocket.h:1241
virtual int getFd() const
Definition: AsyncSocket.h:335
void disableTransparentTls()
Definition: AsyncSocket.h:789
Definition: Traits.h:588
int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
Definition: AsyncSocket.h:156
void failConnect(const char *fn, const AsyncSocketException &ex)
bool isEorTrackingEnabled() const override
Definition: AsyncSocket.h:559
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
WriteTimeout(AsyncSocket *socket, EventBase *eventBase)
Definition: AsyncSocket.h:987
virtual ReadResult performRead(void **buf, size_t *buflen, size_t *offset)
void setSelfCertificate(std::unique_ptr< const AsyncTransportCertificate > cert)
Definition: AsyncSocket.h:841
uint32_t zeroCopyBufId_
Definition: AsyncSocket.h:1216
uint32_t getNextZeroCopyBufId()
Definition: AsyncSocket.h:1202
virtual void handleWrite() noexcept
uint16_t maxReadsPerEvent_
Max reads per event loop iteration.
Definition: AsyncSocket.h:1234
std::chrono::steady_clock::time_point connectEndTime_
Definition: AsyncSocket.h:1261
size_t getAppBytesReceived() const override
Definition: AsyncSocket.h:591
virtual size_t handleErrMessages() noexcept
virtual void scheduleConnectTimeout()
SocketAddress getLocalAddress() const
int * count
int getSockOpt(int level, int optname, T *optval, socklen_t *optlen)
Definition: AsyncSocket.h:714
void timeoutExpired() noexceptoverride
Definition: AsyncSocket.h:990
size_t getAppBytesWritten() const override
Definition: AsyncSocket.h:583
std::unordered_map< folly::IOBuf *, IOBufInfo > idZeroCopyBufInfoMap_
Definition: AsyncSocket.h:1224
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
Definition: AsyncSocket.h:281
void bytesWritten(size_t count)
Definition: AsyncSocket.h:924
ReadCallback * readCallback_
ReadCallback.
Definition: AsyncSocket.h:1249
size_t getRawBytesReceived() const override
Definition: AsyncSocket.h:595
void setReadCB(ReadCallback *callback) override
IoHandler(AsyncSocket *socket, EventBase *eventBase, int fd)
Definition: AsyncSocket.h:1002
virtual void setPreReceivedData(std::unique_ptr< IOBuf > data)
Definition: AsyncSocket.h:770
uint32_t getTotalBytesWritten() const
Definition: AsyncSocket.h:911
const char * string
Definition: Conv.cpp:212
StateEnum state_
StateEnum describing current state.
Definition: AsyncSocket.h:1226
BufferCallback * bufferCallback_
Definition: AsyncSocket.h:1267
void setZeroCopyReenableThreshold(size_t threshold)
void close() override
static set< string > s
const
Definition: upload.py:398
virtual void invokeConnectSuccess()
EventBase * getEventBase() const override
Definition: AsyncSocket.h:328
void addZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
int fd_
The socket file descriptor.
Definition: AsyncSocket.h:1229
std::chrono::steady_clock::time_point getConnectEndTime() const
Definition: AsyncSocket.h:611
bool readable() const override
void setBufferCallback(BufferCallback *cb)
virtual uint32_t getAncillaryDataSize(folly::WriteFlags) noexcept
Definition: AsyncSocket.h:180
ReadResult(ssize_t ret, std::unique_ptr< const AsyncSocketException > e)
Definition: AsyncSocket.h:878
void writeImpl(WriteCallback *callback, const iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE)
bool isPending() const override
virtual SendMsgParamsCallback * getSendMsgParamsCB() const
bool getZeroCopy() const
Definition: AsyncSocket.h:504
folly::Function< void()> callback_
void failWrite(const char *fn, WriteCallback *callback, size_t bytesWritten, const AsyncSocketException &ex)
writes have been completely shut down
Definition: AsyncSocket.h:966
virtual void checkForImmediateRead() noexcept
AsyncSocket::WriteResult sendSocketMessage(int fd, struct msghdr *msg, int msg_flags)
virtual bool isClosedBySelf() const
Definition: AsyncSocket.h:577
ReadCallback * getReadCallback() const override
bool isDetachable() const override
virtual int getSockOptVirtual(int level, int optname, void *optval, socklen_t *optlen)
Definition: AsyncSocket.h:743
int socketConnect(const struct sockaddr *addr, socklen_t len)
virtual void handleInitialReadWrite() noexcept
int setCongestionFlavor(const std::string &cname)
bool operator<(const OptionKey &other) const
Definition: AsyncSocket.h:362
std::unique_ptr< folly::IOBuf > buf_
Definition: AsyncSocket.h:1220
ThreadPoolListHook * addr
WriteCallback * callback_
completion callback
Definition: AsyncSocket.h:935
int apply(int fd, int val) const
Definition: AsyncSocket.h:368
void shutdownWrite() override
size_t appBytesWritten_
Num of bytes written to socket.
Definition: AsyncSocket.h:1254
int setNoDelay(bool noDelay)
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
void adjustZeroCopyFlags(folly::WriteFlags &flags)
WriteCallback * getCallback() const
Definition: AsyncSocket.h:907
virtual void invokeConnectErr(const AsyncSocketException &ex)
int setQuickAck(bool quickack)
state
Definition: http_parser.c:272
virtual int setSockOptVirtual(int level, int optname, void const *optval, socklen_t optlen)
Definition: AsyncSocket.h:758
ConnectCallback * connectCallback_
ConnectCallback.
Definition: AsyncSocket.h:1245
uint32_t getZeroCopyBufId() const
Definition: AsyncSocket.h:508
void releaseZeroCopyBuf(uint32_t id)
def next(obj)
Definition: ast.py:58
std::unique_ptr< IOBuf > preReceivedData_
Definition: AsyncSocket.h:1258
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, const std::string &ip, uint16_t port, uint32_t connectTimeout=0)
Definition: AsyncSocket.h:299