31 #include <boost/preprocessor/control/if.hpp> 34 #include <sys/types.h> 38 #define FOLLY_HAVE_VLA_01 1 40 #define FOLLY_HAVE_VLA_01 0 44 using std::unique_ptr;
51 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 55 #endif // FOLLY_HAVE_MSG_ERRQUEUE 62 "socket closed locally");
65 "socket shutdown for writes");
90 unique_ptr<IOBuf>&& ioBuf,
98 throw std::bad_alloc();
127 bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
168 assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
169 currentOp->iov_base =
182 const struct iovec*
ops,
186 unique_ptr<IOBuf>&& ioBuf,
228 int msg_flags = MSG_DONTWAIT;
230 #ifdef MSG_NOSIGNAL // Linux-only 231 msg_flags |= MSG_NOSIGNAL;
237 msg_flags |= MSG_MORE;
240 #endif // MSG_NOSIGNAL 243 msg_flags |= MSG_EOR;
257 void disableTransparentFunctions(
int fd,
bool noTransparentTls,
bool noTSocks) {
259 (void)noTransparentTls;
262 if (noTransparentTls) {
264 VLOG(5) <<
"Disabling TTLS for fd " << fd;
265 ::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS,
nullptr, 0);
268 VLOG(5) <<
"Disabling TSOCKS for fd " << fd;
282 VLOG(5) <<
"new AsyncSocket()";
291 VLOG(5) <<
"new AsyncSocket(" <<
this <<
", evb=" << evb <<
")";
300 connect(
nullptr, address, connectTimeout);
309 connect(
nullptr, ip, port, connectTimeout);
318 VLOG(5) <<
"new AsyncSocket(" <<
this <<
", evb=" << evb <<
", fd=" << fd
319 <<
", zeroCopyBufId=" << zeroCopyBufId <<
")";
359 VLOG(7) <<
"actual destruction of AsyncSocket(this=" <<
this 365 VLOG(5) <<
"AsyncSocket::destroy(this=" <<
this <<
", evb=" <<
eventBase_ 366 <<
", fd=" <<
fd_ <<
", state=" <<
state_;
376 VLOG(6) <<
"AsyncSocket::detachFd(this=" <<
this <<
", fd=" <<
fd_ 382 socketSet->remove(
fd_);
401 const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
402 const auto newSS = wNewSS.lock();
413 if (newSS &&
fd_ != -1) {
421 int rv = fcntl(
fd_, F_SETFD, FD_CLOEXEC);
423 auto errnoCopy = errno;
426 withAddr(
"failed to set close-on-exec flag"),
456 sockaddr_storage addrStorage;
457 sockaddr* saddr =
reinterpret_cast<sockaddr*
>(&addrStorage);
467 auto errnoCopy = errno;
470 withAddr(
"failed to create socket"),
484 auto errnoCopy = errno;
487 withAddr(
"failed to get socket flags"),
490 int rv = fcntl(
fd_, F_SETFL, flags | O_NONBLOCK);
492 auto errnoCopy = errno;
495 withAddr(
"failed to put socket in non-blocking mode"),
499 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE) 501 rv = fcntl(
fd_, F_SETNOSIGPIPE, 1);
503 auto errnoCopy = errno;
506 "failed to enable F_SETNOSIGPIPE on socket",
516 if (address.getFamily() != AF_UNIX) {
521 VLOG(5) <<
"AsyncSocket::connect(this=" <<
this <<
", evb=" <<
eventBase_ 522 <<
", fd=" <<
fd_ <<
", host=" << address.describe().c_str();
527 if (
setsockopt(
fd_, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one))) {
528 auto errnoCopy = errno;
532 "failed to setsockopt prior to bind on " + bindAddr.describe(),
536 bindAddr.getAddress(&addrStorage);
538 if (
bind(
fd_, saddr, bindAddr.getActualSize()) != 0) {
539 auto errnoCopy = errno;
543 "failed to bind to async socket: " + bindAddr.describe(),
549 for (
const auto& opt : options) {
550 rv = opt.first.apply(
fd_, opt.second);
552 auto errnoCopy = errno;
555 withAddr(
"failed to set socket option"),
561 address.getAddress(&addrStorage);
576 }
catch (
const std::exception& ex) {
578 VLOG(4) <<
"AsyncSocket::connect(this=" <<
this <<
", fd=" <<
fd_ 579 <<
"): unexpected " <<
typeid(ex).
name()
580 <<
" exception: " << ex.what();
583 withAddr(
string(
"unexpected exception: ") + ex.what()));
590 VLOG(8) <<
"AsyncSocket::connect succeeded immediately; this=" <<
this;
603 auto errnoCopy = errno;
604 if (errnoCopy == EINPROGRESS) {
610 "connect failed (immediately)",
625 withAddr(
"failed to schedule AsyncSocket connect timeout"));
639 withAddr(
"failed to register AsyncSocket connect handler"));
653 }
catch (
const std::exception& ex) {
682 withAddr(
"failed to reschedule send timeout in setSendTimeout"));
692 VLOG(6) <<
"AsyncSocket::setErrMessageCB() this=" <<
this <<
", fd=" <<
fd_ 693 <<
", callback=" << callback <<
", state=" <<
state_;
701 if (callback !=
nullptr) {
704 LOG(ERROR) <<
"Failed to set ErrMessageCallback=" << callback
705 <<
" for Unix Doamin Socket where MSG_ERRQUEUE is unsupported," 716 if (!msgErrQueueSupported) {
724 if (callback ==
nullptr) {
769 VLOG(6) <<
"AsyncSocket::setReadCallback() this=" <<
this <<
", fd=" <<
fd_ 770 <<
", callback=" << callback <<
", state=" <<
state_;
798 if (callback !=
nullptr) {
861 if (msgErrQueueSupported) {
868 int val = enable ? 1 : 0;
883 socklen_t optlen =
sizeof(
val);
887 enable = val ?
true :
false;
931 CHECK(p.buf_.get() ==
nullptr);
945 auto ptr = iter->second;
948 if (0 == --iter1->second.count_) {
958 CHECK(p.buf_.get() ==
nullptr);
968 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 969 if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
970 (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
971 const struct sock_extended_err* serr =
972 reinterpret_cast<const struct sock_extended_err*
>(CMSG_DATA(&cmsg));
982 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 983 const struct sock_extended_err* serr =
984 reinterpret_cast<const struct sock_extended_err*
>(CMSG_DATA(&cmsg));
989 VLOG(2) <<
"AsyncSocket::processZeroCopyMsg(): setting " 990 <<
"zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED " 1009 op.iov_base =
const_cast<void*
>(buf);
1011 writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
1019 writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
1024 unique_ptr<IOBuf>&& buf,
1028 constexpr
size_t kSmallSizeMax = 64;
1029 size_t count = buf->countChainElements();
1030 if (count <= kSmallSizeMax) {
1049 unique_ptr<IOBuf>&& buf,
1051 size_t veclen = buf->fillIov(vec, count);
1059 unique_ptr<IOBuf>&& buf,
1061 VLOG(6) <<
"AsyncSocket::writev() this=" <<
this <<
", fd=" <<
fd_ 1062 <<
", callback=" << callback <<
", count=" << count
1065 unique_ptr<IOBuf> ioBuf(
std::move(buf));
1082 ssize_t bytesWritten = 0;
1083 bool mustRegister =
false;
1093 vec,
uint32_t(count), flags, &countWritten, &partialWritten);
1094 bytesWritten = writeResult.writeReturn;
1095 if (bytesWritten < 0) {
1096 auto errnoCopy = errno;
1097 if (writeResult.exception) {
1098 return failWrite(__func__, callback, 0, *writeResult.exception);
1104 return failWrite(__func__, callback, 0, ex);
1105 }
else if (countWritten == count) {
1130 mustRegister =
true;
1150 }
catch (
const std::exception& ex) {
1154 withAddr(
string(
"failed to append new WriteRequest: ") + ex.what()));
1155 return failWrite(__func__, callback,
size_t(bytesWritten), tex);
1180 withAddr(
"failed to schedule send timeout"));
1199 VLOG(5) <<
"AsyncSocket::close(): this=" <<
this <<
", fd_=" <<
fd_ 1200 <<
", state=" <<
state_ <<
", shutdownFlags=" << std::hex
1253 VLOG(5) <<
"AsyncSocket::closeNow(): this=" <<
this <<
", fd_=" <<
fd_ 1254 <<
", state=" <<
state_ <<
", shutdownFlags=" << std::hex
1320 LOG(
DFATAL) <<
"AsyncSocket::closeNow() (this=" <<
this <<
", fd=" <<
fd_ 1321 <<
") called in unknown state " <<
state_;
1328 struct linger optLinger = {1, 0};
1329 if (
setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1330 VLOG(2) <<
"AsyncSocket::closeWithReset(): error setting SO_LINGER " 1331 <<
"on " <<
fd_ <<
": errno=" << errno;
1340 VLOG(5) <<
"AsyncSocket::shutdownWrite(): this=" <<
this <<
", fd=" <<
fd_ 1341 <<
", state=" <<
state_ <<
", shutdownFlags=" << std::hex
1359 VLOG(5) <<
"AsyncSocket::shutdownWriteNow(): this=" <<
this <<
", fd=" <<
fd_ 1360 <<
", state=" <<
state_ <<
", shutdownFlags=" << std::hex
1382 switch (static_cast<StateEnum>(
state_)) {
1431 VLOG(4) <<
"AsyncSocket::shutdownWriteNow() (this=" <<
this 1432 <<
", fd=" <<
fd_ <<
") in unexpected state " <<
state_ 1433 <<
" with SHUT_WRITE not set (" << std::hex << (int)
shutdownFlags_ 1439 LOG(
DFATAL) <<
"AsyncSocket::shutdownWriteNow() (this=" <<
this 1440 <<
", fd=" <<
fd_ <<
") called in unknown state " <<
state_;
1447 struct pollfd fds[1];
1449 fds[0].events = POLLIN;
1451 int rc =
poll(fds, 1, 0);
1459 struct pollfd fds[1];
1461 fds[0].events = POLLOUT;
1463 int rc =
poll(fds, 1, 0);
1477 #ifdef POLLRDHUP // Linux-only 1478 struct pollfd fds[1];
1480 fds[0].events = POLLRDHUP | POLLHUP;
1483 return (fds[0].revents & (POLLRDHUP | POLLHUP)) != 0;
1501 VLOG(5) <<
"AsyncSocket::attachEventBase(this=" <<
this <<
", fd=" <<
fd_ 1502 <<
", old evb=" <<
eventBase_ <<
", new evb=" << eventBase
1520 VLOG(5) <<
"AsyncSocket::detachEventBase(this=" <<
this <<
", fd=" <<
fd_ 1549 }
catch (
const std::system_error& e) {
1550 if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1551 VLOG(2) <<
"Error caching addresses: " << e.code().value() <<
", " 1552 << e.code().message();
1591 VLOG(4) <<
"AsyncSocket::setNoDelay() called on non-open socket " <<
this 1592 <<
"(state=" <<
state_ <<
")";
1596 int value = noDelay ? 1 : 0;
1597 if (
setsockopt(
fd_, IPPROTO_TCP, TCP_NODELAY, &value,
sizeof(value)) != 0) {
1598 int errnoCopy = errno;
1599 VLOG(2) <<
"failed to update TCP_NODELAY option on AsyncSocket " <<
this 1600 <<
" (fd=" <<
fd_ <<
", state=" <<
state_ 1609 #ifndef TCP_CONGESTION 1610 #define TCP_CONGESTION 13 1614 VLOG(4) <<
"AsyncSocket::setCongestionFlavor() called on non-open " 1615 <<
"socket " <<
this <<
"(state=" <<
state_ <<
")";
1624 socklen_t(cname.length() + 1)) != 0) {
1625 int errnoCopy = errno;
1626 VLOG(2) <<
"failed to update TCP_CONGESTION option on AsyncSocket " <<
this 1627 <<
"(fd=" <<
fd_ <<
", state=" <<
state_ 1638 VLOG(4) <<
"AsyncSocket::setQuickAck() called on non-open socket " <<
this 1639 <<
"(state=" <<
state_ <<
")";
1643 #ifdef TCP_QUICKACK // Linux-only 1644 int value = quickack ? 1 : 0;
1645 if (
setsockopt(
fd_, IPPROTO_TCP, TCP_QUICKACK, &value,
sizeof(value)) != 0) {
1646 int errnoCopy = errno;
1647 VLOG(2) <<
"failed to update TCP_QUICKACK option on AsyncSocket" <<
this 1648 <<
"(fd=" <<
fd_ <<
", state=" <<
state_ 1661 VLOG(4) <<
"AsyncSocket::setSendBufSize() called on non-open socket " 1662 <<
this <<
"(state=" <<
state_ <<
")";
1666 if (
setsockopt(
fd_, SOL_SOCKET, SO_SNDBUF, &bufsize,
sizeof(bufsize)) != 0) {
1667 int errnoCopy = errno;
1668 VLOG(2) <<
"failed to update SO_SNDBUF option on AsyncSocket" <<
this 1669 <<
"(fd=" <<
fd_ <<
", state=" <<
state_ 1679 VLOG(4) <<
"AsyncSocket::setRecvBufSize() called on non-open socket " 1680 <<
this <<
"(state=" <<
state_ <<
")";
1684 if (
setsockopt(
fd_, SOL_SOCKET, SO_RCVBUF, &bufsize,
sizeof(bufsize)) != 0) {
1685 int errnoCopy = errno;
1686 VLOG(2) <<
"failed to update SO_RCVBUF option on AsyncSocket" <<
this 1687 <<
"(fd=" <<
fd_ <<
", state=" <<
state_ 1697 VLOG(4) <<
"AsyncSocket::setTCPProfile() called on non-open socket " <<
this 1698 <<
"(state=" <<
state_ <<
")";
1703 int errnoCopy = errno;
1704 VLOG(2) <<
"failed to set socket namespace option on AsyncSocket" <<
this 1705 <<
"(fd=" <<
fd_ <<
", state=" <<
state_ 1714 VLOG(7) <<
"AsyncSocket::ioRead() this=" <<
this <<
", fd=" <<
fd_ 1715 <<
", events=" << std::hex << events <<
", state=" <<
state_;
1741 }
else if (relevantEvents == EventHandler::READ_WRITE) {
1757 VLOG(4) <<
"AsyncSocket::ioRead() called with unexpected events " 1758 << std::hex << events <<
"(this=" <<
this <<
")";
1765 VLOG(5) <<
"AsyncSocket::performRead() this=" <<
this <<
", buf=" << *buf
1766 <<
", buflen=" << *buflen;
1769 VLOG(5) <<
"AsyncSocket::performRead() this=" <<
this 1770 <<
", reading pre-received data";
1784 ssize_t bytes =
recv(
fd_, *buf, *buflen, MSG_DONTWAIT);
1786 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1807 VLOG(5) <<
"AsyncSocket::handleErrMessages() this=" <<
this <<
", fd=" <<
fd_ 1810 VLOG(7) <<
"AsyncSocket::handleErrMessages(): " 1811 <<
"no callback installed - exiting.";
1815 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 1821 entry.iov_base = &
data;
1822 entry.iov_len =
sizeof(
data);
1823 msg.msg_iov = &entry;
1825 msg.msg_name =
nullptr;
1826 msg.msg_namelen = 0;
1827 msg.msg_control = ctrl;
1828 msg.msg_controllen =
sizeof(ctrl);
1835 VLOG(5) <<
"AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1838 if (errno != EAGAIN) {
1839 auto errnoCopy = errno;
1840 LOG(ERROR) <<
"::recvmsg exited with code " << ret
1841 <<
", errno: " << errnoCopy;
1852 for (
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1853 cmsg !=
nullptr && cmsg->cmsg_len != 0;
1854 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1867 #endif // FOLLY_HAVE_MSG_ERRQUEUE 1882 VLOG(5) <<
"AsyncSocket::handleRead() this=" <<
this <<
", fd=" <<
fd_ 1911 void* buf =
nullptr;
1912 size_t buflen = 0, offset = 0;
1915 VLOG(5) <<
"prepareReadBuffer() buf=" << buf <<
", buflen=" << buflen;
1918 }
catch (
const std::exception& ex) {
1921 string(
"ReadCallback::getReadBuffer() " 1922 "threw exception: ") +
1928 "ReadCallback::getReadBuffer() threw " 1929 "non-exception type");
1935 "ReadCallback::getReadBuffer() returned " 1941 auto readResult =
performRead(&buf, &buflen, &offset);
1942 auto bytesRead = readResult.readReturn;
1943 VLOG(4) <<
"this=" <<
this <<
", AsyncSocket::handleRead() got " 1944 << bytesRead <<
" bytes";
1945 if (bytesRead > 0) {
1950 VLOG(5) <<
"this=" <<
this <<
", AsyncSocket::handleRead() got " 1951 <<
"buf=" << buf <<
", " << bytesRead <<
"/" << buflen
1952 <<
", offset=" << offset;
1955 readBuf->trimEnd(buflen - offset - bytesRead);
1963 if (
size_t(bytesRead) < buflen) {
1971 if (readResult.exception) {
1972 return failRead(__func__, *readResult.exception);
1974 auto errnoCopy = errno;
2020 VLOG(5) <<
"AsyncSocket::handleWrite() this=" <<
this <<
", fd=" <<
fd_ 2040 if (writeResult.writeReturn < 0) {
2041 if (writeResult.exception) {
2042 return failWrite(__func__, *writeResult.exception);
2044 auto errnoCopy = errno;
2135 withAddr(
"failed to reschedule write timeout"));
2207 VLOG(5) <<
"AsyncSocket::handleConnect() this=" <<
this <<
", fd=" <<
fd_ 2225 socklen_t len =
sizeof(
error);
2226 int rv =
getsockopt(
fd_, SOL_SOCKET, SO_ERROR, &error, &len);
2228 auto errnoCopy = errno;
2231 withAddr(
"error calling getsockopt() after connect"),
2233 VLOG(4) <<
"AsyncSocket::handleConnect(this=" <<
this <<
", fd=" <<
fd_ 2234 <<
" host=" <<
addr_.
describe() <<
") exception:" << ex.what();
2241 VLOG(2) <<
"AsyncSocket::handleConnect(this=" <<
this <<
", fd=" <<
fd_ 2242 <<
" host=" <<
addr_.
describe() <<
") exception: " << ex.what();
2260 VLOG(7) <<
"AsyncSocket " <<
this <<
": fd " <<
fd_ 2261 <<
"successfully connected; state=" <<
state_;
2286 VLOG(7) <<
"AsyncSocket " <<
this <<
", fd " <<
fd_ <<
": timeout expired: " 2322 ssize_t totalWritten = 0;
2324 sockaddr_storage
addr;
2326 msg->msg_name = &
addr;
2327 msg->msg_namelen = len;
2329 if (totalWritten >= 0) {
2335 }
else if (errno == EINPROGRESS) {
2336 VLOG(4) <<
"TFO falling back to connecting";
2346 WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2351 }
else if (errno == EOPNOTSUPP) {
2353 VLOG(4) <<
"TFO not supported";
2369 WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2371 }
else if (errno == EAGAIN) {
2378 std::make_unique<AsyncSocketException>(
2382 totalWritten =
::sendmsg(fd, msg, msg_flags);
2398 msg.msg_name =
nullptr;
2399 msg.msg_namelen = 0;
2400 msg.msg_iov =
const_cast<iovec*
>(
vec);
2406 msg.msg_controllen);
2408 if (msg.msg_controllen != 0) {
2409 msg.msg_control =
reinterpret_cast<char*
>(alloca(msg.msg_controllen));
2412 msg.msg_control =
nullptr;
2417 auto totalWritten = writeResult.writeReturn;
2425 totalWritten = writeResult.writeReturn;
2427 if (totalWritten < 0) {
2428 bool tryAgain = (errno == EAGAIN);
2434 tryAgain |= (errno == ENOTCONN);
2437 if (!writeResult.exception && tryAgain) {
2440 *partialWritten = 0;
2445 *partialWritten = 0;
2453 for (bytesWritten =
uint32_t(totalWritten), n = 0; n <
count; ++n) {
2454 const iovec*
v = vec + n;
2455 if (v->iov_len > bytesWritten) {
2458 *partialWritten = bytesWritten;
2462 bytesWritten -=
uint32_t(v->iov_len);
2465 assert(bytesWritten == 0);
2467 *partialWritten = 0;
2481 VLOG(5) <<
"AsyncSocket::updateEventRegistration(this=" <<
this 2497 withAddr(
"failed to update AsyncSocket event registration"));
2498 fail(
"updateEventRegistration", ex);
2559 withAddr(
"socket closing after error"));
2570 VLOG(4) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2572 <<
"): failed in " << fn <<
"(): " << ex.what();
2578 VLOG(5) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2580 <<
"): failed while connecting in " << fn <<
"(): " << ex.what();
2588 VLOG(5) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2590 <<
"): failed while reading in " << fn <<
"(): " << ex.what();
2605 VLOG(5) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2607 <<
"): failed while reading message in " << fn <<
"(): " << ex.what();
2620 VLOG(5) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2622 <<
"): failed while writing in " << fn <<
"(): " << ex.what();
2635 callback->
writeErr(bytesWritten, ex);
2645 size_t bytesWritten,
2649 VLOG(4) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2651 <<
"): failed while writing in " << fn <<
"(): " << ex.what();
2654 if (callback !=
nullptr) {
2655 callback->
writeErr(bytesWritten, ex);
2677 VLOG(5) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2678 <<
"): connect() called in invalid state " <<
state_;
2689 "connect() called with socket in invalid state");
2708 VLOG(4) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2709 <<
"): setErrMessageCB(" << callback <<
") called in invalid state " 2714 msgErrQueueSupported
2715 ?
"setErrMessageCB() called with socket in invalid state" 2716 :
"This platform does not support socket error message notifications");
2749 VLOG(4) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2750 <<
"): setReadCallback(" << callback <<
") called in invalid state " 2755 "setReadCallback() called with socket in " 2771 VLOG(4) <<
"AsyncSocket(this=" <<
this <<
", fd=" <<
fd_ 2772 <<
"): write() called in invalid state " <<
state_;
2776 withAddr(
"write() called with socket in invalid state"));
2810 os << static_cast<int>(
state);
2821 }
catch (
const std::exception&) {
2826 return s +
" (peer=" + peer.
describe() +
", local=" + local.
describe() +
")";
virtual void readBufferAvailable(std::unique_ptr< IOBuf >) noexcept
void setZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void scheduleImmediateRead() noexcept
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wSS)
bool getTFOSucceded() const
virtual ssize_t tfoSendMsg(int fd, struct msghdr *msg, int msg_flags)
static BytesWriteRequest * newRequest(AsyncSocket *socket, WriteCallback *callback, const iovec *ops, uint32_t opCount, uint32_t partialWritten, uint32_t bytesWritten, unique_ptr< IOBuf > &&ioBuf, WriteFlags flags)
const struct iovec * getOps() const
friend std::ostream & operator<<(std::ostream &os, const StateEnum &state)
std::unordered_map< uint32_t, folly::IOBuf * > idZeroCopyBufPtrMap_
#define SO_EE_CODE_ZEROCOPY_COPIED
#define FOLLY_GNU_DISABLE_WARNING(warningName)
virtual void readDataAvailable(size_t len) noexcept=0
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
#define FOLLY_POP_WARNING
uint32_t opCount_
number of entries in writeOps_
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
bool connecting() const override
void shutdownWriteNow() override
void setFromPeerAddress(int socket)
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
ssize_t sendmsg(NetworkSocket socket, const msghdr *message, int flags)
virtual bool hangup() const
virtual void connectSuccess() noexcept=0
std::string withAddr(const std::string &s)
int setSockOpt(int level, int optname, const T *optval)
#define FOLLY_PUSH_WARNING
int setTCPProfile(int profd)
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
bool isZeroCopyRequest(WriteFlags flags)
bool containsZeroCopyBuf(folly::IOBuf *ptr)
std::string sformat(StringPiece fmt, Args &&...args)
virtual bool isComplete()=0
bool writable() const override
WriteFlags flags_
set for WriteFlags
bool processZeroCopyWriteInProgress() noexcept
virtual WriteResult performWrite(const iovec *vec, uint32_t count, WriteFlags flags, uint32_t *countWritten, uint32_t *partialWritten)
void writev(WriteCallback *callback, const iovec *vec, size_t count, WriteFlags flags=WriteFlags::NONE) override
void cacheLocalAddress() const
int setSendBufSize(size_t bufsize)
void invokeAllErrors(const AsyncSocketException &ex)
ssize_t bytesWritten_
bytes written altogether
bool isZeroCopyWriteInProgress() const noexcept
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
virtual void errMessageError(const AsyncSocketException &ex) noexcept=0
uint32_t sendTimeout_
The send timeout, in milliseconds.
uint32_t getDestructorGuardCount() const
virtual void connectErr(const AsyncSocketException &ex) noexcept=0
virtual void handleConnect() noexcept
socklen_t getActualSize() const
std::unique_ptr< EvbChangeCallback > evbChangeCb_
void cachePeerAddress() const
std::unique_ptr< folly::IOBuf > move()
shutdownWrite() called, but we are still waiting on writes to drain
virtual void setErrMessageCB(ErrMessageCallback *callback)
constexpr bool kOpenSslModeMoveBufferOwnership
bool isZeroCopyMsg(const cmsghdr &cmsg) const
size_t pullAtMost(void *buf, size_t len)
—— Concurrent Priority Queue Implementation ——
EventBase * eventBase_
The EventBase.
folly::SocketAddress addr_
The address we tried to connect to.
std::string describe() const
int8_t readErr_
The read error encountered, if any.
void attachEventBase(EventBase *eventBase)
void failErrMessageRead(const char *fn, const AsyncSocketException &ex)
requires E e noexcept(noexcept(s.error(std::move(e))))
WriteRequest * getNext() const
virtual void prepareReadBuffer(void **buf, size_t *buflen)
std::chrono::steady_clock::time_point connectStartTime_
AsyncSocket * socket_
parent socket
virtual void setSendMsgParamCB(SendMsgParamsCallback *callback)
bool updateEventRegistration()
void scheduleInitialReadWrite() noexcept
WriteRequest * writeReqTail_
End of WriteRequest chain.
WriteRequest * writeReqHead_
Chain of WriteRequests.
bool error() const override
void failAllWrites(const AsyncSocketException &ex)
uint8_t shutdownFlags_
Shutdown state (ShutdownFlags)
void writeChainImpl(WriteCallback *callback, iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags)
SendMsgParamsCallback * sendMsgParamCallback_
< Callback for retrieving
int setRecvBufSize(size_t bufsize)
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds connectTimeout_
void fail(const char *fn, const AsyncSocketException &ex)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
bool isSet(WriteFlags a, WriteFlags b)
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
void attachEventBase(EventBase *eventBase) override
size_t zeroCopyReenableCounter_
void writeChain(WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
BytesWriteRequest(AsyncSocket *socket, WriteCallback *callback, const struct iovec *ops, uint32_t opCount, uint32_t partialBytes, uint32_t bytesWritten, unique_ptr< IOBuf > &&ioBuf, WriteFlags flags)
virtual void handleRead() noexcept
void registerForConnectEvents()
virtual void onEgressBufferCleared()=0
virtual void writeRequest(WriteRequest *req)
bool isLoopCallbackScheduled() const
struct iovec writeOps_[]
write operation(s) list
virtual void onEgressBuffered()=0
#define FOLLY_HAVE_VLA_01
bool setZeroCopy(bool enable)
std::map< OptionKey, int > OptionMap
const AsyncSocketException socketShutdownForWritesEx(AsyncSocketException::END_OF_FILE,"socket shutdown for writes")
ImmediateReadCB immediateReadHandler_
LoopCallback for checking read.
folly::SocketAddress localAddr_
The address we are connecting from.
void invalidState(ConnectCallback *callback)
void append(WriteRequest *next)
void processZeroCopyMsg(const cmsghdr &cmsg)
void ioReady(uint16_t events) noexcept
size_t appBytesReceived_
Num of bytes received from socket.
sa_family_t getFamily() const
void cancelLoopCallback()
SocketAddress getPeerAddress() const
static const size_t maxAncillaryDataSize
static const folly::SocketAddress & anyAddress()
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
IoHandler ioHandler_
A EventHandler to monitor the fd.
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
virtual void getAncillaryData(folly::WriteFlags, void *) noexcept
void detachEventBase() override
constexpr auto data(C &c) -> decltype(c.data())
uint16_t eventFlags_
EventBase::HandlerFlags settings.
uint32_t opsWritten_
complete ops written
bool isInitialized() const
void failRead(const char *fn, const AsyncSocketException &ex)
bool good() const override
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
size_t zeroCopyReenableThreshold_
void timeoutExpired() noexcept
void closeWithReset() override
virtual ErrMessageCallback * getErrMessageCallback() const
static const OptionMap emptyOptionMap
ErrMessageCallback * errMessageCallback_
TimestampCallback.
ShutdownSocketSet shutdownSocketSet
WriteTimeout writeTimeout_
A timeout for connect and write.
int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
void failConnect(const char *fn, const AsyncSocketException &ex)
NetworkSocket socket(int af, int type, int protocol)
socklen_t getAddress(sockaddr_storage *addr) const
virtual ReadResult performRead(void **buf, size_t *buflen, size_t *offset)
unique_ptr< IOBuf > ioBuf_
underlying IOBuf, or nullptr if N/A
static constexpr bool msgErrQueueSupported
uint32_t getNextZeroCopyBufId()
virtual void handleWrite() noexcept
void dcheckIsInEventBaseThread() const
virtual WriteResult performWrite()=0
uint32_t opIndex_
current index into writeOps_
uint16_t maxReadsPerEvent_
Max reads per event loop iteration.
uint32_t totalBytesWritten_
total bytes written
void attachEventBase(EventBase *eventBase, InternalEnum internal=InternalEnum::NORMAL)
std::chrono::steady_clock::time_point connectEndTime_
#define SO_EE_ORIGIN_ZEROCOPY
void changeHandlerFD(int fd)
virtual size_t handleErrMessages() noexcept
fbstring errnoStr(int err)
virtual void scheduleConnectTimeout()
SocketAddress getLocalAddress() const
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
std::unordered_map< folly::IOBuf *, IOBufInfo > idZeroCopyBufInfoMap_
void bytesWritten(size_t count)
ReadCallback * readCallback_
ReadCallback.
void setReadCB(ReadCallback *callback) override
static std::unique_ptr< IOBuf > takeOwnership(void *buf, std::size_t capacity, FreeFunction freeFn=nullptr, void *userData=nullptr, bool freeOnError=true)
~BytesWriteRequest() override=default
void setFromLocalAddress(int socket)
uint32_t getTotalBytesWritten() const
WriteFlags unSet(WriteFlags a, WriteFlags b)
StateEnum state_
StateEnum describing current state.
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
BufferCallback * bufferCallback_
void setZeroCopyReenableThreshold(size_t threshold)
virtual void invokeConnectSuccess()
EventBase * getEventBase() const override
void addZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void trimStart(size_t amount)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
int fd_
The socket file descriptor.
bool scheduleTimeout(uint32_t milliseconds)
bool readable() const override
void setBufferCallback(BufferCallback *cb)
virtual void writeSuccess() noexcept=0
bool isComplete() override
virtual uint32_t getAncillaryDataSize(folly::WriteFlags) noexcept
void writeImpl(WriteCallback *callback, const iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE)
bool isPending() const override
virtual SendMsgParamsCallback * getSendMsgParamsCB() const
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
void failWrite(const char *fn, WriteCallback *callback, size_t bytesWritten, const AsyncSocketException &ex)
writes have been completely shut down
virtual void checkForImmediateRead() noexcept
AsyncSocket::WriteResult sendSocketMessage(int fd, struct msghdr *msg, int msg_flags)
ssize_t recv(NetworkSocket s, void *buf, size_t len, int flags)
ReadCallback * getReadCallback() const override
const AsyncSocketException socketClosedLocallyEx(AsyncSocketException::END_OF_FILE,"socket closed locally")
bool isDetachable() const override
ssize_t tfo_sendmsg(int, const struct msghdr *, int)
virtual void handleInitialReadWrite() noexcept
int socketConnect(const struct sockaddr *addr, socklen_t len)
WriteResult performWrite() override
int setCongestionFlavor(const std::string &cname)
virtual void readEOF() noexcept=0
bool registerHandler(uint16_t events)
virtual void errMessage(const cmsghdr &cmsg) noexcept=0
ThreadPoolListHook * addr
void shutdownWrite() override
size_t appBytesWritten_
Num of bytes written to socket.
int setNoDelay(bool noDelay)
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
void adjustZeroCopyFlags(folly::WriteFlags &flags)
WriteCallback * getCallback() const
virtual void invokeConnectErr(const AsyncSocketException &ex)
int setQuickAck(bool quickack)
ConnectCallback * connectCallback_
ConnectCallback.
uint32_t partialBytes_
partial bytes of incomplete op written
uint32_t getZeroCopyBufId() const
int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
void releaseZeroCopyBuf(uint32_t id)
uint32_t getOpCount() const
std::unique_ptr< IOBuf > preReceivedData_
virtual void readErr(const AsyncSocketException &ex) noexcept=0