30 #define SO_REUSEPORT 15 57 "error creating async udp socket",
68 "failed to put socket in non-blocking mode",
76 socket, SOL_SOCKET, SO_REUSEADDR, &value,
sizeof(value)) != 0) {
79 "failed to put socket in reuse mode",
88 socket, SOL_SOCKET,
SO_REUSEPORT, &value,
sizeof(value)) != 0) {
91 "failed to put socket in reuse_port mode",
102 socket, SOL_SOCKET, SO_BUSY_POLL, &value,
sizeof(value)) != 0) {
105 "failed to set SO_BUSY_POLL on the socket",
118 socket, SOL_SOCKET, SO_RCVBUF, &value,
sizeof(value)) != 0) {
121 "failed to set SO_RCVBUF on the socket",
130 socket, SOL_SOCKET, SO_SNDBUF, &value,
sizeof(value)) != 0) {
133 "failed to set SO_SNDBUF on the socket",
142 socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag,
sizeof(flag))) {
149 sockaddr_storage addrStorage;
151 auto& saddr =
reinterpret_cast<sockaddr&
>(addrStorage);
155 "failed to bind the async udp socket for:" + address.
describe(),
176 #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO) && \ 177 defined(IP_PMTUDISC_WANT) 179 int v4 = df ? IP_PMTUDISC_DO : IP_PMTUDISC_WANT;
183 "Failed to set DF with IP_MTU_DISCOVER",
188 #if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO) && \ 189 defined(IPV6_PMTUDISC_WANT) 190 if (
address().getFamily() == AF_INET6) {
191 int v6 = df ? IPV6_PMTUDISC_DO : IPV6_PMTUDISC_WANT;
193 fd_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &v6,
sizeof(v6))) {
196 "Failed to set DF with IPV6_MTU_DISCOVER",
207 #if defined(IP_RECVERR) 214 #if defined(IPV6_RECVERR) 215 if (
address().getFamily() == AF_INET6 &&
236 const std::unique_ptr<folly::IOBuf>& buf,
243 size_t iovec_len = buf->
fillIov(vec,
sizeof(vec) /
sizeof(vec[0]));
246 vec[0].iov_base =
const_cast<uint8_t*
>(buf->
data());
247 vec[0].iov_len = buf->
length();
251 return writev(address, vec, iovec_len, gso);
256 const std::unique_ptr<folly::IOBuf>& buf) {
262 const struct iovec*
vec,
267 sockaddr_storage addrStorage;
271 msg.msg_name =
reinterpret_cast<void*
>(&addrStorage);
273 msg.msg_iov =
const_cast<struct iovec*
>(
vec);
274 msg.msg_iovlen = iovec_len;
275 msg.msg_control =
nullptr;
276 msg.msg_controllen = 0;
279 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 281 char control[CMSG_SPACE(
sizeof(
uint16_t))];
282 msg.msg_control = control;
283 msg.msg_controllen =
sizeof(control);
285 struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
288 cm->cmsg_len = CMSG_LEN(
sizeof(
uint16_t));
290 memcpy(CMSG_DATA(cm), &gso_len,
sizeof(gso_len));
295 CHECK_LT(gso, 1) <<
"GSO not supported";
303 const struct iovec*
vec,
305 return writev(address, vec, iovec_len, 0);
308 CHECK(!
readCallback_) <<
"Another read callback already installed";
310 <<
"UDP server socket not yet bind to an address";
357 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 366 entry.iov_base = &
data;
367 entry.iov_len =
sizeof(
data);
368 msg.msg_iov = &entry;
370 msg.msg_name =
nullptr;
372 msg.msg_control = ctrl;
373 msg.msg_controllen =
sizeof(ctrl);
380 VLOG(5) <<
"AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
383 if (errno != EAGAIN) {
384 auto errnoCopy = errno;
385 LOG(ERROR) <<
"::recvmsg exited with code " << ret
386 <<
", errno: " << errnoCopy;
396 for (
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
397 cmsg !=
nullptr && cmsg->cmsg_len != 0;
398 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
423 sockaddr_storage addrStorage;
443 if (buf ==
nullptr || len == 0) {
446 "AsyncUDPSocket::getReadBuffer() returned empty buffer");
456 struct sockaddr_storage addrStorage;
457 socklen_t addrLen =
sizeof(addrStorage);
458 memset(&addrStorage, 0,
size_t(addrLen));
459 struct sockaddr* rawAddr =
reinterpret_cast<sockaddr*
>(&addrStorage);
464 if (bytesRead >= 0) {
468 bool truncated =
false;
469 if ((
size_t)bytesRead > len) {
471 bytesRead = ssize_t(len);
478 if (errno == EAGAIN || errno == EWOULDBLOCK) {
508 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 523 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 525 socklen_t optlen =
sizeof(gso);
ReadCallback * readCallback_
virtual const folly::SocketAddress & address() const
void setFD(int fd, FDOwnership ownership)
ssize_t recvfrom(NetworkSocket s, void *buf, size_t len, int flags, sockaddr *from, socklen_t *fromlen)
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
virtual void onReadError(const AsyncSocketException &ex) noexcept=0
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
virtual ssize_t writeGSO(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf, int gso)
virtual void resumeRead(ReadCallback *cob)
virtual ssize_t writev(const folly::SocketAddress &address, const struct iovec *vec, size_t veclen, int gso)
socklen_t getActualSize() const
virtual void onReadClosed() noexcept=0
void setFromSockaddr(const struct sockaddr *address)
const uint8_t * data() const
#define FOLLY_UNLIKELY(x)
virtual void setErrMessageCallback(ErrMessageCallback *errMessageCallback)
void failErrMessageRead(const AsyncSocketException &ex)
virtual void errMessageError(const AsyncSocketException &ex) noexcept=0
—— Concurrent Priority Queue Implementation ——
std::string describe() const
void attachEventBase(EventBase *eventBase)
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void attachEventBase(folly::EventBase *evb)
virtual ssize_t sendmsg(NetworkSocket socket, const struct msghdr *message, int flags)
virtual ssize_t write(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf)
size_t fillIov(struct iovec *iov, size_t len) const
virtual void detachEventBase()
bool isInEventBaseThread() const
sa_family_t getFamily() const
constexpr auto data(C &c) -> decltype(c.data())
std::size_t length() const
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
virtual void errMessage(const cmsghdr &cmsg) noexcept=0
AsyncUDPSocket(EventBase *evb)
bool updateRegistration() noexcept
virtual void getReadBuffer(void **buf, size_t *len) noexcept=0
size_t handleErrMessages() noexcept
NetworkSocket socket(int af, int type, int protocol)
socklen_t getAddress(sockaddr_storage *addr) const
virtual void bind(const folly::SocketAddress &address)
folly::Optional< int > gso_
void dcheckIsInEventBaseThread() const
void changeHandlerFD(int fd)
~AsyncUDPSocket() override
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
ErrMessageCallback * errMessageCallback_
virtual void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexcept=0
int set_socket_non_blocking(NetworkSocket s)
void setFromLocalAddress(int socket)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
folly::SocketAddress clientAddress_
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void handlerReady(uint16_t events) noexceptoverride
FOLLY_CPP14_CONSTEXPR const Value & value() const &
void handleRead() noexcept
virtual void dontFragment(bool df)
bool registerHandler(uint16_t events)
folly::SocketAddress localAddress_
int close(NetworkSocket s)
virtual int connect(const folly::SocketAddress &address)