proxygen
AsyncUDPSocket.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <folly/Likely.h>
24 
25 #include <errno.h>
26 
27 // Due to the way kernel headers are included, this may or may not be defined.
28 // Number pulled from 3.10 kernel headers.
29 #ifndef SO_REUSEPORT
30 #define SO_REUSEPORT 15
31 #endif
32 
34 
35 namespace folly {
36 
38  : EventHandler(CHECK_NOTNULL(evb)),
39  readCallback_(nullptr),
40  eventBase_(evb),
41  fd_() {
43 }
44 
46  if (fd_ != NetworkSocket()) {
47  close();
48  }
49 }
50 
53  netops::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
54  if (socket == NetworkSocket()) {
57  "error creating async udp socket",
58  errno);
59  }
60 
61  auto g = folly::makeGuard([&] { netops::close(socket); });
62 
63  // put the socket in non-blocking mode
64  int ret = netops::set_socket_non_blocking(socket);
65  if (ret != 0) {
68  "failed to put socket in non-blocking mode",
69  errno);
70  }
71 
72  if (reuseAddr_) {
73  // put the socket in reuse mode
74  int value = 1;
76  socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) != 0) {
79  "failed to put socket in reuse mode",
80  errno);
81  }
82  }
83 
84  if (reusePort_) {
85  // put the socket in port reuse mode
86  int value = 1;
88  socket, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value)) != 0) {
91  "failed to put socket in reuse_port mode",
92  errno);
93  }
94  }
95 
96  if (busyPollUs_ > 0) {
97 #ifdef SO_BUSY_POLL
98  // Set busy_poll time in microseconds on the socket.
99  // It sets how long socket will be in busy_poll mode when no event occurs.
100  int value = busyPollUs_;
101  if (netops::setsockopt(
102  socket, SOL_SOCKET, SO_BUSY_POLL, &value, sizeof(value)) != 0) {
103  throw AsyncSocketException(
105  "failed to set SO_BUSY_POLL on the socket",
106  errno);
107  }
108 #else /* SO_BUSY_POLL is not supported*/
109  throw AsyncSocketException(
110  AsyncSocketException::NOT_OPEN, "SO_BUSY_POLL is not supported", errno);
111 #endif
112  }
113 
114  if (rcvBuf_ > 0) {
115  // Set the size of the buffer for the received messages in rx_queues.
116  int value = rcvBuf_;
117  if (netops::setsockopt(
118  socket, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) != 0) {
119  throw AsyncSocketException(
121  "failed to set SO_RCVBUF on the socket",
122  errno);
123  }
124  }
125 
126  if (sndBuf_ > 0) {
127  // Set the size of the buffer for the sent messages in tx_queues.
128  int value = rcvBuf_;
129  if (netops::setsockopt(
130  socket, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) != 0) {
131  throw AsyncSocketException(
133  "failed to set SO_SNDBUF on the socket",
134  errno);
135  }
136  }
137 
138  // If we're using IPv6, make sure we don't accept V4-mapped connections
139  if (address.getFamily() == AF_INET6) {
140  int flag = 1;
141  if (netops::setsockopt(
142  socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
143  throw AsyncSocketException(
144  AsyncSocketException::NOT_OPEN, "Failed to set IPV6_V6ONLY", errno);
145  }
146  }
147 
148  // bind to the address
149  sockaddr_storage addrStorage;
150  address.getAddress(&addrStorage);
151  auto& saddr = reinterpret_cast<sockaddr&>(addrStorage);
152  if (netops::bind(socket, &saddr, address.getActualSize()) != 0) {
153  throw AsyncSocketException(
155  "failed to bind the async udp socket for:" + address.describe(),
156  errno);
157  }
158 
159  // success
160  g.dismiss();
161  fd_ = socket;
163 
164  // attach to EventHandler
166 
167  if (address.getPort() != 0) {
169  } else {
171  }
172 }
173 
175  (void)df; // to avoid potential unused variable warning
176 #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO) && \
177  defined(IP_PMTUDISC_WANT)
178  if (address().getFamily() == AF_INET) {
179  int v4 = df ? IP_PMTUDISC_DO : IP_PMTUDISC_WANT;
180  if (netops::setsockopt(fd_, IPPROTO_IP, IP_MTU_DISCOVER, &v4, sizeof(v4))) {
181  throw AsyncSocketException(
183  "Failed to set DF with IP_MTU_DISCOVER",
184  errno);
185  }
186  }
187 #endif
188 #if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO) && \
189  defined(IPV6_PMTUDISC_WANT)
190  if (address().getFamily() == AF_INET6) {
191  int v6 = df ? IPV6_PMTUDISC_DO : IPV6_PMTUDISC_WANT;
192  if (netops::setsockopt(
193  fd_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &v6, sizeof(v6))) {
194  throw AsyncSocketException(
196  "Failed to set DF with IPV6_MTU_DISCOVER",
197  errno);
198  }
199  }
200 #endif
201 }
202 
204  ErrMessageCallback* errMessageCallback) {
205  errMessageCallback_ = errMessageCallback;
206  int err = (errMessageCallback_ != nullptr);
207 #if defined(IP_RECVERR)
208  if (address().getFamily() == AF_INET &&
209  netops::setsockopt(fd_, IPPROTO_IP, IP_RECVERR, &err, sizeof(err))) {
210  throw AsyncSocketException(
211  AsyncSocketException::NOT_OPEN, "Failed to set IP_RECVERR", errno);
212  }
213 #endif
214 #if defined(IPV6_RECVERR)
215  if (address().getFamily() == AF_INET6 &&
216  netops::setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVERR, &err, sizeof(err))) {
217  throw AsyncSocketException(
218  AsyncSocketException::NOT_OPEN, "Failed to set IPV6_RECVERR", errno);
219  }
220 #endif
221  (void)err;
222 }
223 
225  CHECK_EQ(NetworkSocket(), fd_) << "Already bound to another FD";
226 
227  fd_ = fd;
228  ownership_ = ownership;
229 
232 }
233 
236  const std::unique_ptr<folly::IOBuf>& buf,
237  int gso) {
238  // UDP's typical MTU size is 1500, so high number of buffers
239  // really do not make sense. Optimize for buffer chains with
240  // buffers less than 16, which is the highest I can think of
241  // for a real use case.
242  iovec vec[16];
243  size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0]));
244  if (UNLIKELY(iovec_len == 0)) {
245  buf->coalesce();
246  vec[0].iov_base = const_cast<uint8_t*>(buf->data());
247  vec[0].iov_len = buf->length();
248  iovec_len = 1;
249  }
250 
251  return writev(address, vec, iovec_len, gso);
252 }
253 
256  const std::unique_ptr<folly::IOBuf>& buf) {
257  return writeGSO(address, buf, 0);
258 }
259 
262  const struct iovec* vec,
263  size_t iovec_len,
264  int gso) {
265  CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
266 
267  sockaddr_storage addrStorage;
268  address.getAddress(&addrStorage);
269 
270  struct msghdr msg;
271  msg.msg_name = reinterpret_cast<void*>(&addrStorage);
272  msg.msg_namelen = address.getActualSize();
273  msg.msg_iov = const_cast<struct iovec*>(vec);
274  msg.msg_iovlen = iovec_len;
275  msg.msg_control = nullptr;
276  msg.msg_controllen = 0;
277  msg.msg_flags = 0;
278 
279 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
280  if (gso > 0) {
281  char control[CMSG_SPACE(sizeof(uint16_t))];
282  msg.msg_control = control;
283  msg.msg_controllen = sizeof(control);
284 
285  struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
286  cm->cmsg_level = SOL_UDP;
287  cm->cmsg_type = UDP_SEGMENT;
288  cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
289  uint16_t gso_len = static_cast<uint16_t>(gso);
290  memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
291 
292  return sendmsg(fd_, &msg, 0);
293  }
294 #else
295  CHECK_LT(gso, 1) << "GSO not supported";
296 #endif
297 
298  return sendmsg(fd_, &msg, 0);
299 }
300 
303  const struct iovec* vec,
304  size_t iovec_len) {
305  return writev(address, vec, iovec_len, 0);
306 }
308  CHECK(!readCallback_) << "Another read callback already installed";
309  CHECK_NE(NetworkSocket(), fd_)
310  << "UDP server socket not yet bind to an address";
311 
312  readCallback_ = CHECK_NOTNULL(cob);
313  if (!updateRegistration()) {
315  AsyncSocketException::NOT_OPEN, "failed to register for accept events");
316 
317  readCallback_ = nullptr;
318  cob->onReadError(ex);
319  return;
320  }
321 }
322 
324  // It is ok to pause an already paused socket
325  readCallback_ = nullptr;
327 }
328 
331 
332  if (readCallback_) {
333  auto cob = readCallback_;
334  readCallback_ = nullptr;
335 
336  cob->onReadClosed();
337  }
338 
339  // Unregister any events we are registered for
341 
344  }
345 
346  fd_ = NetworkSocket();
347 }
348 
350  if (events & EventHandler::READ) {
351  DCHECK(readCallback_);
352  handleRead();
353  }
354 }
355 
357 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
358  if (errMessageCallback_ == nullptr) {
359  return 0;
360  }
361  uint8_t ctrl[1024];
362  unsigned char data;
363  struct msghdr msg;
364  iovec entry;
365 
366  entry.iov_base = &data;
367  entry.iov_len = sizeof(data);
368  msg.msg_iov = &entry;
369  msg.msg_iovlen = 1;
370  msg.msg_name = nullptr;
371  msg.msg_namelen = 0;
372  msg.msg_control = ctrl;
373  msg.msg_controllen = sizeof(ctrl);
374  msg.msg_flags = 0;
375 
376  int ret;
377  size_t num = 0;
378  while (fd_ != NetworkSocket()) {
379  ret = netops::recvmsg(fd_, &msg, MSG_ERRQUEUE);
380  VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
381 
382  if (ret < 0) {
383  if (errno != EAGAIN) {
384  auto errnoCopy = errno;
385  LOG(ERROR) << "::recvmsg exited with code " << ret
386  << ", errno: " << errnoCopy;
389  "recvmsg() failed",
390  errnoCopy);
391  failErrMessageRead(ex);
392  }
393  return num;
394  }
395 
396  for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
397  cmsg != nullptr && cmsg->cmsg_len != 0;
398  cmsg = CMSG_NXTHDR(&msg, cmsg)) {
399  ++num;
401  if (fd_ == NetworkSocket()) {
402  // once the socket is closed there is no use for more read errors.
403  return num;
404  }
405  }
406  }
407  return num;
408 #else
409  return 0;
410 #endif
411 }
412 
414  if (errMessageCallback_ != nullptr) {
416  errMessageCallback_ = nullptr;
417  callback->errMessageError(ex);
418  }
419 }
420 
422  CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
423  sockaddr_storage addrStorage;
424  address.getAddress(&addrStorage);
425  return netops::connect(
426  fd_, reinterpret_cast<sockaddr*>(&addrStorage), address.getActualSize());
427 }
428 
430  void* buf{nullptr};
431  size_t len{0};
432 
433  if (handleErrMessages()) {
434  return;
435  }
436 
437  if (fd_ == NetworkSocket()) {
438  // The socket may have been closed by the error callbacks.
439  return;
440  }
441 
442  readCallback_->getReadBuffer(&buf, &len);
443  if (buf == nullptr || len == 0) {
446  "AsyncUDPSocket::getReadBuffer() returned empty buffer");
447 
448  auto cob = readCallback_;
449  readCallback_ = nullptr;
450 
451  cob->onReadError(ex);
453  return;
454  }
455 
456  struct sockaddr_storage addrStorage;
457  socklen_t addrLen = sizeof(addrStorage);
458  memset(&addrStorage, 0, size_t(addrLen));
459  struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
460  rawAddr->sa_family = localAddress_.getFamily();
461 
462  ssize_t bytesRead =
463  netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
464  if (bytesRead >= 0) {
465  clientAddress_.setFromSockaddr(rawAddr, addrLen);
466 
467  if (bytesRead > 0) {
468  bool truncated = false;
469  if ((size_t)bytesRead > len) {
470  truncated = true;
471  bytesRead = ssize_t(len);
472  }
473 
475  clientAddress_, size_t(bytesRead), truncated);
476  }
477  } else {
478  if (errno == EAGAIN || errno == EWOULDBLOCK) {
479  // No data could be read without blocking the socket
480  return;
481  }
482 
484  AsyncSocketException::INTERNAL_ERROR, "::recvfrom() failed", errno);
485 
486  // In case of UDP we can continue reading from the socket
487  // even if the current request fails. We notify the user
488  // so that he can do some logging/stats collection if he wants.
489  auto cob = readCallback_;
490  readCallback_ = nullptr;
491 
492  cob->onReadError(ex);
494  }
495 }
496 
498  uint16_t flags = NONE;
499 
500  if (readCallback_) {
501  flags |= READ;
502  }
503 
504  return registerHandler(uint16_t(flags | PERSIST));
505 }
506 
508 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
509  int ret = netops::setsockopt(fd_, SOL_UDP, UDP_SEGMENT, &val, sizeof(val));
510 
511  gso_ = ret ? -1 : val;
512 
513  return !ret;
514 #else
515  (void)val;
516  return false;
517 #endif
518 }
519 
521  // check if we can return the cached value
522  if (FOLLY_UNLIKELY(!gso_.hasValue())) {
523 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
524  int gso = -1;
525  socklen_t optlen = sizeof(gso);
526  if (!netops::getsockopt(fd_, SOL_UDP, UDP_SEGMENT, &gso, &optlen)) {
527  gso_ = gso;
528  } else {
529  gso_ = -1;
530  }
531 #else
532  gso_ = -1;
533 #endif
534  }
535 
536  return gso_.value();
537 }
538 
542  eventBase_ = nullptr;
544 }
545 
547  DCHECK(!eventBase_);
548  DCHECK(evb && evb->isInEventBaseThread());
549  eventBase_ = evb;
552 }
553 
554 } // namespace folly
ReadCallback * readCallback_
virtual const folly::SocketAddress & address() const
void setFD(int fd, FDOwnership ownership)
ssize_t recvfrom(NetworkSocket s, void *buf, size_t len, int flags, sockaddr *from, socklen_t *fromlen)
Definition: NetOps.cpp:207
flags
Definition: http_parser.h:127
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
virtual void onReadError(const AsyncSocketException &ex) noexcept=0
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
virtual void pauseRead()
ByteRange coalesce()
Definition: IOBuf.h:1095
virtual ssize_t writeGSO(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf, int gso)
virtual void resumeRead(ReadCallback *cob)
virtual ssize_t writev(const folly::SocketAddress &address, const struct iovec *vec, size_t veclen, int gso)
socklen_t getActualSize() const
virtual void onReadClosed() noexcept=0
void setFromSockaddr(const struct sockaddr *address)
double val
Definition: String.cpp:273
const uint8_t * data() const
Definition: IOBuf.h:499
#define FOLLY_UNLIKELY(x)
Definition: Likely.h:36
virtual void setErrMessageCallback(ErrMessageCallback *errMessageCallback)
void failErrMessageRead(const AsyncSocketException &ex)
uint16_t getPort() const
virtual void errMessageError(const AsyncSocketException &ex) noexcept=0
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::string describe() const
void attachEventBase(EventBase *eventBase)
requires E e noexcept(noexcept(s.error(std::move(e))))
static once_flag flag
Definition: Random.cpp:75
virtual void attachEventBase(folly::EventBase *evb)
#define nullptr
Definition: http_parser.c:41
#define SOL_UDP
Definition: NetOps.h:59
virtual ssize_t sendmsg(NetworkSocket socket, const struct msghdr *message, int flags)
#define UDP_SEGMENT
Definition: NetOps.h:67
virtual ssize_t write(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf)
size_t fillIov(struct iovec *iov, size_t len) const
Definition: IOBuf.cpp:1072
virtual void detachEventBase()
bool isInEventBaseThread() const
Definition: EventBase.h:504
sa_family_t getFamily() const
#define SO_REUSEPORT
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
std::size_t length() const
Definition: IOBuf.h:533
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
Definition: Optional.h:300
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
virtual void errMessage(const cmsghdr &cmsg) noexcept=0
AsyncUDPSocket(EventBase *evb)
bool updateRegistration() noexcept
virtual void getReadBuffer(void **buf, size_t *len) noexcept=0
Definition: Traits.h:588
size_t handleErrMessages() noexcept
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
virtual void bind(const folly::SocketAddress &address)
folly::Optional< int > gso_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void changeHandlerFD(int fd)
Definition: EventHandler.h:143
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
Definition: NetOps.cpp:268
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
Definition: ScopeGuard.h:184
ErrMessageCallback * errMessageCallback_
virtual void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexcept=0
int set_socket_non_blocking(NetworkSocket s)
Definition: NetOps.cpp:441
void setFromLocalAddress(int socket)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
g_t g(f_t)
folly::SocketAddress clientAddress_
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void handlerReady(uint16_t events) noexceptoverride
vector< string > vec
Definition: StringTest.cpp:35
FOLLY_CPP14_CONSTEXPR const Value & value() const &
Definition: Optional.h:268
void handleRead() noexcept
#define UNLIKELY(x)
Definition: Likely.h:48
virtual void dontFragment(bool df)
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
folly::SocketAddress localAddress_
int close(NetworkSocket s)
Definition: NetOps.cpp:90
virtual int connect(const folly::SocketAddress &address)