30 #include <sys/types.h> 66 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS 67 #define SO_NO_TRANSPARENT_TLS 200 70 #if defined __linux__ && !defined SO_NO_TSOCKS 71 #define SO_NO_TSOCKS 201 79 #pragma vtordisp(push, 2) 83 typedef std::unique_ptr<AsyncSocket, Destructor>
UniquePtr;
132 virtual void errMessage(
const cmsghdr& cmsg)
noexcept = 0;
157 return getFlagsImpl(
flags, getDefaultFlags(
flags, zeroCopyEnabled));
185 static const size_t maxAncillaryDataSize{0x5000};
292 return std::shared_ptr<AsyncSocket>(
304 return std::shared_ptr<AsyncSocket>(
363 if (level == other.
level) {
364 return optname < other.
optname;
366 return level < other.
level;
369 return setsockopt(fd, level, optname, &val,
sizeof(val));
395 const OptionMap& options = emptyOptionMap,
530 std::unique_ptr<folly::IOBuf>&& buf,
540 void close()
override;
549 virtual bool hangup()
const;
550 bool good()
const override;
551 bool error()
const override;
698 #define SO_SET_NAMESPACE 41 713 template <
typename T>
714 int getSockOpt(
int level,
int optname,
T* optval, socklen_t* optlen) {
715 return getsockopt(
fd_, level, optname, (
void*)optval, optlen);
726 template <
typename T>
834 std::unique_ptr<const AsyncTransportCertificate> cert) {
842 std::unique_ptr<const AsyncTransportCertificate> cert) {
860 WriteResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
861 : writeReturn(ret), exception(
std::
move(e)) {}
878 ReadResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
879 : readReturn(ret), exception(
std::
move(e)) {}
899 virtual void consume() = 0;
901 virtual bool isComplete() = 0;
912 return totalBytesWritten_;
916 assert(next_ ==
nullptr);
925 totalBytesWritten_ +=
uint32_t(count);
1020 socket_->checkForImmediateRead();
1085 std::unique_ptr<folly::IOBuf>&& buf,
1109 std::unique_ptr<folly::IOBuf>&& buf,
1143 virtual ssize_t
tfoSendMsg(
int fd,
struct msghdr* msg,
int msg_flags);
1181 size_t bytesWritten,
1220 std::unique_ptr<folly::IOBuf>
buf_;
1288 #pragma vtordisp(pop)
void setZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void scheduleImmediateRead() noexcept
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wSS)
bool getTFOSucceded() const
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, const folly::SocketAddress &address, uint32_t connectTimeout=0)
virtual ssize_t tfoSendMsg(int fd, struct msghdr *msg, int msg_flags)
friend std::ostream & operator<<(std::ostream &os, const StateEnum &state)
std::unordered_map< uint32_t, folly::IOBuf * > idZeroCopyBufPtrMap_
std::chrono::nanoseconds getConnectTime() const
bool connecting() const override
void shutdownWriteNow() override
virtual bool hangup() const
virtual void connectSuccess() noexcept=0
std::chrono::milliseconds getConnectTimeout() const
std::string withAddr(const std::string &s)
int setSockOpt(int level, int optname, const T *optval)
std::chrono::steady_clock::time_point getConnectStartTime() const
int setTCPProfile(int profd)
size_t getZeroCopyReenableThreshold() const
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, int fd)
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
std::unique_ptr< const AsyncSocketException > exception
bool isZeroCopyRequest(WriteFlags flags)
bool containsZeroCopyBuf(folly::IOBuf *ptr)
WriteRequest(AsyncSocket *socket, WriteCallback *callback)
bool writable() const override
virtual bool isClosedByPeer() const
bool processZeroCopyWriteInProgress() noexcept
uint32_t getSendTimeout() const override
virtual WriteResult performWrite(const iovec *vec, uint32_t count, WriteFlags flags, uint32_t *countWritten, uint32_t *partialWritten)
void writev(WriteCallback *callback, const iovec *vec, size_t count, WriteFlags flags=WriteFlags::NONE) override
void cacheLocalAddress() const
std::unique_ptr< const AsyncTransportCertificate > peerCertData_
int setSendBufSize(size_t bufsize)
void invokeAllErrors(const AsyncSocketException &ex)
bool isZeroCopyWriteInProgress() const noexcept
constexpr detail::Map< Move > move
uint32_t sendTimeout_
The send timeout, in milliseconds.
virtual void connectErr(const AsyncSocketException &ex) noexcept=0
virtual void handleConnect() noexcept
IoHandler(AsyncSocket *socket, EventBase *eventBase)
void fail(const char *fn, const AsyncSocketException &ex)
std::unique_ptr< EvbChangeCallback > evbChangeCb_
void cachePeerAddress() const
const AsyncTransportCertificate * getPeerCertificate() const override
size_t getRawBytesWritten() const override
shutdownWrite() called, but we are still waiting on writes to drain
virtual void setErrMessageCB(ErrMessageCallback *callback)
bool isZeroCopyMsg(const cmsghdr &cmsg) const
bool getTFOFinished() const
—— Concurrent Priority Queue Implementation ——
EventBase * eventBase_
The EventBase.
folly::SocketAddress addr_
The address we tried to connect to.
std::unique_ptr< const AsyncSocketException > exception
int8_t readErr_
The read error encountered, if any.
void failErrMessageRead(const char *fn, const AsyncSocketException &ex)
requires E e noexcept(noexcept(s.error(std::move(e))))
WriteRequest * getNext() const
virtual void prepareReadBuffer(void **buf, size_t *buflen)
void handlerReady(uint16_t events) noexceptoverride
std::chrono::steady_clock::time_point connectStartTime_
uint16_t getMaxReadsPerEvent() const
AsyncSocket * socket_
parent socket
virtual void setSendMsgParamCB(SendMsgParamsCallback *callback)
bool updateEventRegistration()
void scheduleInitialReadWrite() noexcept
WriteRequest * writeReqTail_
End of WriteRequest chain.
WriteRequest * writeReqHead_
Chain of WriteRequests.
bool error() const override
void failAllWrites(const AsyncSocketException &ex)
uint8_t shutdownFlags_
Shutdown state (ShutdownFlags)
void writeChainImpl(WriteCallback *callback, iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags)
SendMsgParamsCallback * sendMsgParamCallback_
< Callback for retrieving
int setRecvBufSize(size_t bufsize)
bool getTFOAttempted() const
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds connectTimeout_
void setPeerCertificate(std::unique_ptr< const AsyncTransportCertificate > cert)
void fail(const char *fn, const AsyncSocketException &ex)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
void attachEventBase(EventBase *eventBase) override
const AsyncTransportCertificate * getSelfCertificate() const override
size_t zeroCopyReenableCounter_
void writeChain(WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
WriteResult(ssize_t ret, std::unique_ptr< const AsyncSocketException > e)
virtual void handleRead() noexcept
void registerForConnectEvents()
void setMaxReadsPerEvent(uint16_t maxReads)
virtual void writeRequest(WriteRequest *req)
bool setZeroCopy(bool enable)
std::map< OptionKey, int > OptionMap
ImmediateReadCB immediateReadHandler_
LoopCallback for checking read.
void runInLoop(LoopCallback *callback, bool thisIteration=false)
folly::SocketAddress localAddr_
The address we are connecting from.
void invalidState(ConnectCallback *callback)
void append(WriteRequest *next)
void processZeroCopyMsg(const cmsghdr &cmsg)
void ioReady(uint16_t events) noexcept
size_t appBytesReceived_
Num of bytes received from socket.
SocketAddress getPeerAddress() const
static const folly::SocketAddress & anyAddress()
IoHandler ioHandler_
A EventHandler to monitor the fd.
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
virtual void getAncillaryData(folly::WriteFlags, void *) noexcept
void setEvbChangedCallback(std::unique_ptr< EvbChangeCallback > cb)
void detachEventBase() override
virtual int getFlagsImpl(folly::WriteFlags, int defaultFlags)
AsyncServerSocket::UniquePtr socket_
constexpr auto data(C &c) -> decltype(c.data())
uint16_t eventFlags_
EventBase::HandlerFlags settings.
void failRead(const char *fn, const AsyncSocketException &ex)
void setEorTracking(bool track) override
bool good() const override
std::unique_ptr< const AsyncTransportCertificate > selfCertData_
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
size_t zeroCopyReenableThreshold_
void timeoutExpired() noexcept
void closeWithReset() override
virtual ErrMessageCallback * getErrMessageCallback() const
static const OptionMap emptyOptionMap
ErrMessageCallback * errMessageCallback_
TimestampCallback.
WriteTimeout writeTimeout_
A timeout for connect and write.
virtual int getFd() const
void disableTransparentTls()
int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
void failConnect(const char *fn, const AsyncSocketException &ex)
bool isEorTrackingEnabled() const override
NetworkSocket socket(int af, int type, int protocol)
WriteTimeout(AsyncSocket *socket, EventBase *eventBase)
virtual ReadResult performRead(void **buf, size_t *buflen, size_t *offset)
void setSelfCertificate(std::unique_ptr< const AsyncTransportCertificate > cert)
uint32_t getNextZeroCopyBufId()
virtual void handleWrite() noexcept
uint16_t maxReadsPerEvent_
Max reads per event loop iteration.
std::chrono::steady_clock::time_point connectEndTime_
size_t getAppBytesReceived() const override
virtual size_t handleErrMessages() noexcept
virtual void scheduleConnectTimeout()
SocketAddress getLocalAddress() const
int getSockOpt(int level, int optname, T *optval, socklen_t *optlen)
void timeoutExpired() noexceptoverride
size_t getAppBytesWritten() const override
std::unordered_map< folly::IOBuf *, IOBufInfo > idZeroCopyBufInfoMap_
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
void bytesWritten(size_t count)
ReadCallback * readCallback_
ReadCallback.
size_t getRawBytesReceived() const override
void setReadCB(ReadCallback *callback) override
IoHandler(AsyncSocket *socket, EventBase *eventBase, int fd)
virtual void setPreReceivedData(std::unique_ptr< IOBuf > data)
uint32_t getTotalBytesWritten() const
StateEnum state_
StateEnum describing current state.
BufferCallback * bufferCallback_
void setZeroCopyReenableThreshold(size_t threshold)
virtual void invokeConnectSuccess()
EventBase * getEventBase() const override
void addZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
int fd_
The socket file descriptor.
std::chrono::steady_clock::time_point getConnectEndTime() const
bool readable() const override
void setBufferCallback(BufferCallback *cb)
virtual uint32_t getAncillaryDataSize(folly::WriteFlags) noexcept
ReadResult(ssize_t ret, std::unique_ptr< const AsyncSocketException > e)
void writeImpl(WriteCallback *callback, const iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE)
bool isPending() const override
virtual SendMsgParamsCallback * getSendMsgParamsCB() const
folly::Function< void()> callback_
void failWrite(const char *fn, WriteCallback *callback, size_t bytesWritten, const AsyncSocketException &ex)
writes have been completely shut down
virtual void checkForImmediateRead() noexcept
AsyncSocket::WriteResult sendSocketMessage(int fd, struct msghdr *msg, int msg_flags)
virtual bool isClosedBySelf() const
ReadCallback * getReadCallback() const override
bool isDetachable() const override
virtual int getSockOptVirtual(int level, int optname, void *optval, socklen_t *optlen)
int socketConnect(const struct sockaddr *addr, socklen_t len)
virtual void handleInitialReadWrite() noexcept
int setCongestionFlavor(const std::string &cname)
bool operator<(const OptionKey &other) const
virtual ~ConnectCallback()=default
std::unique_ptr< folly::IOBuf > buf_
ThreadPoolListHook * addr
WriteCallback * callback_
completion callback
int apply(int fd, int val) const
void shutdownWrite() override
size_t appBytesWritten_
Num of bytes written to socket.
int setNoDelay(bool noDelay)
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
void adjustZeroCopyFlags(folly::WriteFlags &flags)
WriteCallback * getCallback() const
virtual void invokeConnectErr(const AsyncSocketException &ex)
int setQuickAck(bool quickack)
virtual int setSockOptVirtual(int level, int optname, void const *optval, socklen_t optlen)
ConnectCallback * connectCallback_
ConnectCallback.
uint32_t getZeroCopyBufId() const
void releaseZeroCopyBuf(uint32_t id)
std::unique_ptr< IOBuf > preReceivedData_
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb, const std::string &ip, uint16_t port, uint32_t connectTimeout=0)