proxygen
AsyncServerSocket.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20 
22 
23 #include <folly/FileUtil.h>
24 #include <folly/Portability.h>
25 #include <folly/SocketAddress.h>
26 #include <folly/String.h>
33 
34 #include <errno.h>
35 #include <string.h>
36 #include <sys/types.h>
37 
39 
40 namespace folly {
41 
42 #ifndef TCP_SAVE_SYN
43 #define TCP_SAVE_SYN 27
44 #endif
45 
46 #ifndef TCP_SAVED_SYN
47 #define TCP_SAVED_SYN 28
48 #endif
49 
50 static constexpr bool msgErrQueueSupported =
51 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
52  true;
53 #else
54  false;
55 #endif // FOLLY_HAVE_MSG_ERRQUEUE
56 
60 
61 int setCloseOnExec(int fd, int value) {
62  // Read the current flags
63  int old_flags = fcntl(fd, F_GETFD, 0);
64 
65  // If reading the flags failed, return error indication now
66  if (old_flags < 0) {
67  return -1;
68  }
69 
70  // Set just the flag we want to set
71  int new_flags;
72  if (value != 0) {
73  new_flags = old_flags | FD_CLOEXEC;
74  } else {
75  new_flags = old_flags & ~FD_CLOEXEC;
76  }
77 
78  // Store modified flag word in the descriptor
79  return fcntl(fd, F_SETFD, new_flags);
80 }
81 
83  EventBase* eventBase,
84  uint32_t maxAtOnce,
85  uint32_t maxInQueue) {
86  setMaxReadAtOnce(maxAtOnce);
87  queue_.setMaxQueueSize(maxInQueue);
88 
89  if (!eventBase->runInEventBaseThread([=]() {
90  callback_->acceptStarted();
91  this->startConsuming(eventBase, &queue_);
92  })) {
93  throw std::invalid_argument(
94  "unable to start waiting on accept "
95  "notification queue in the specified "
96  "EventBase thread");
97  }
98 }
99 
101  EventBase* eventBase,
102  AcceptCallback* callback) {
103  if (!eventBase->runInEventBaseThread([=]() {
104  callback->acceptStopped();
105  delete this;
106  })) {
107  throw std::invalid_argument(
108  "unable to start waiting on accept "
109  "notification queue in the specified "
110  "EventBase thread");
111  }
112 }
113 
115  QueueMessage&& msg) noexcept {
116  switch (msg.type) {
120  msg.fd, msg.address);
121  }
122  callback_->connectionAccepted(msg.fd, msg.address);
123  break;
124  }
125  case MessageType::MSG_ERROR: {
126  std::runtime_error ex(msg.msg);
127  callback_->acceptError(ex);
128  break;
129  }
130  default: {
131  LOG(ERROR) << "invalid accept notification message type "
132  << int(msg.type);
133  std::runtime_error ex(
134  "received invalid accept notification message type");
135  callback_->acceptError(ex);
136  }
137  }
138 }
139 
140 /*
141  * AsyncServerSocket::BackoffTimeout
142  */
144  public:
145  // Disallow copy, move, and default constructors.
146  BackoffTimeout(BackoffTimeout&&) = delete;
148  : AsyncTimeout(socket->getEventBase()), socket_(socket) {}
149 
150  void timeoutExpired() noexcept override {
151  socket_->backoffTimeoutExpired();
152  }
153 
154  private:
156 };
157 
158 /*
159  * AsyncServerSocket methods
160  */
161 
163  : eventBase_(eventBase),
164  accepting_(false),
168  acceptRate_(1),
169  lastAccepTimestamp_(std::chrono::steady_clock::now()),
171  callbackIndex_(0),
173  callbacks_(),
174  keepAliveEnabled_(true),
175  closeOnExec_(true) {
177 }
178 
180  const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
181  const auto newSS = wNewSS.lock();
182  const auto shutdownSocketSet = wShutdownSocketSet_.lock();
183 
184  if (shutdownSocketSet == newSS) {
185  return;
186  }
187 
188  if (shutdownSocketSet) {
189  for (auto& h : sockets_) {
190  shutdownSocketSet->remove(h.socket_);
191  }
192  }
193 
194  if (newSS) {
195  for (auto& h : sockets_) {
196  newSS->add(h.socket_);
197  }
198  }
199 
200  wShutdownSocketSet_ = wNewSS;
201 }
202 
204  assert(callbacks_.empty());
205 }
206 
207 int AsyncServerSocket::stopAccepting(int shutdownFlags) {
208  int result = 0;
209  for (auto& handler : sockets_) {
210  VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_;
211  }
212  if (eventBase_) {
214  }
215 
216  // When destroy is called, unregister and close the socket immediately.
217  accepting_ = false;
218 
219  // Close the sockets in reverse order as they were opened to avoid
220  // the condition where another process concurrently tries to open
221  // the same port, succeed to bind the first socket but fails on the
222  // second because it hasn't been closed yet.
223  for (; !sockets_.empty(); sockets_.pop_back()) {
224  auto& handler = sockets_.back();
225  handler.unregisterHandler();
226  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
227  shutdownSocketSet->close(handler.socket_);
228  } else if (shutdownFlags >= 0) {
229  result = shutdownNoInt(handler.socket_, shutdownFlags);
230  pendingCloseSockets_.push_back(handler.socket_);
231  } else {
232  closeNoInt(handler.socket_);
233  }
234  }
235 
236  // Destroy the backoff timout. This will cancel it if it is running.
237  delete backoffTimeout_;
238  backoffTimeout_ = nullptr;
239 
240  // Close all of the callback queues to notify them that they are being
241  // destroyed. No one should access the AsyncServerSocket any more once
242  // destroy() is called. However, clear out callbacks_ before invoking the
243  // accept callbacks just in case. This will potentially help us detect the
244  // bug if one of the callbacks calls addAcceptCallback() or
245  // removeAcceptCallback().
246  std::vector<CallbackInfo> callbacksCopy;
247  callbacks_.swap(callbacksCopy);
248  for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
249  it != callbacksCopy.end();
250  ++it) {
251  // consumer may not be set if we are running in primary event base
252  if (it->consumer) {
253  DCHECK(it->eventBase);
254  it->consumer->stop(it->eventBase, it->callback);
255  } else {
256  DCHECK(it->callback);
257  it->callback->acceptStopped();
258  }
259  }
260 
261  return result;
262 }
263 
265  stopAccepting();
266  for (auto s : pendingCloseSockets_) {
267  closeNoInt(s);
268  }
269  // Then call DelayedDestruction::destroy() to take care of
270  // whether or not we need immediate or delayed destruction
272 }
273 
275  assert(eventBase_ == nullptr);
276  eventBase->dcheckIsInEventBaseThread();
277 
278  eventBase_ = eventBase;
279  for (auto& handler : sockets_) {
280  handler.attachEventBase(eventBase);
281  }
282 }
283 
285  assert(eventBase_ != nullptr);
287  assert(!accepting_);
288 
289  eventBase_ = nullptr;
290  for (auto& handler : sockets_) {
291  handler.detachEventBase();
292  }
293 }
294 
295 void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
296  if (eventBase_) {
298  }
299 
300  if (sockets_.size() > 0) {
301  throw std::invalid_argument(
302  "cannot call useExistingSocket() on a "
303  "AsyncServerSocket that already has a socket");
304  }
305 
306  for (auto fd : fds) {
307  // Set addressFamily_ from this socket.
308  // Note that the socket may not have been bound yet, but
309  // setFromLocalAddress() will still work and get the correct address family.
310  // We will update addressFamily_ again anyway if bind() is called later.
311  SocketAddress address;
312  address.setFromLocalAddress(fd);
313 
314 #if __linux__
315  if (noTransparentTls_) {
316  // Ignore return value, errors are ok
317  setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
318  }
319 #endif
320 
321  setupSocket(fd, address.getFamily());
322  sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
323  sockets_.back().changeHandlerFD(fd);
324  }
325 }
326 
328  useExistingSockets({fd});
329 }
330 
332  int fd,
333  const SocketAddress& address,
334  bool isExistingSocket) {
335  sockaddr_storage addrStorage;
336  address.getAddress(&addrStorage);
337  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
338 
339  if (fsp::bind(fd, saddr, address.getActualSize()) != 0) {
340  if (!isExistingSocket) {
341  closeNoInt(fd);
342  }
344  errno, "failed to bind to async server socket: " + address.describe());
345  }
346 
347 #if __linux__
348  if (noTransparentTls_) {
349  // Ignore return value, errors are ok
350  setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
351  }
352 #endif
353 
354  // If we just created this socket, update the EventHandler and set socket_
355  if (!isExistingSocket) {
356  sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
357  }
358 }
359 
361  if (msgErrQueueSupported) {
362  int fd = getSocket();
363  int val = enable ? 1 : 0;
364  int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
365 
366  return (0 == ret);
367  }
368 
369  return false;
370 }
371 
373  if (eventBase_) {
375  }
376 
377  // useExistingSocket() may have been called to initialize socket_ already.
378  // However, in the normal case we need to create a new socket now.
379  // Don't set socket_ yet, so that socket_ will remain uninitialized if an
380  // error occurs.
381  int fd;
382  if (sockets_.size() == 0) {
383  fd = createSocket(address.getFamily());
384  } else if (sockets_.size() == 1) {
385  if (address.getFamily() != sockets_[0].addressFamily_) {
386  throw std::invalid_argument(
387  "Attempted to bind address to socket with "
388  "different address family");
389  }
390  fd = sockets_[0].socket_;
391  } else {
392  throw std::invalid_argument("Attempted to bind to multiple fds");
393  }
394 
395  bindSocket(fd, address, !sockets_.empty());
396 }
397 
399  const std::vector<IPAddress>& ipAddresses,
400  uint16_t port) {
401  if (ipAddresses.empty()) {
402  throw std::invalid_argument("No ip addresses were provided");
403  }
404  if (!sockets_.empty()) {
405  throw std::invalid_argument(
406  "Cannot call bind on a AsyncServerSocket "
407  "that already has a socket.");
408  }
409 
410  for (const IPAddress& ipAddress : ipAddresses) {
411  SocketAddress address(ipAddress.toFullyQualified(), port);
412  int fd = createSocket(address.getFamily());
413 
414  bindSocket(fd, address, false);
415  }
416  if (sockets_.size() == 0) {
417  throw std::runtime_error(
418  "did not bind any async server socket for port and addresses");
419  }
420 }
421 
423  struct addrinfo hints, *res0;
424  char sport[sizeof("65536")];
425 
426  memset(&hints, 0, sizeof(hints));
427  hints.ai_family = AF_UNSPEC;
428  hints.ai_socktype = SOCK_STREAM;
429  hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
430  snprintf(sport, sizeof(sport), "%u", port);
431 
432  // On Windows the value we need to pass to bind to all available
433  // addresses is an empty string. Everywhere else, it's nullptr.
434  constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
435  if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
436  throw std::invalid_argument(
437  "Attempted to bind address to socket with "
438  "bad getaddrinfo");
439  }
440 
441  SCOPE_EXIT {
442  freeaddrinfo(res0);
443  };
444 
445  auto setupAddress = [&](struct addrinfo* res) {
446  int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
447  // IPv6/IPv4 may not be supported by the kernel
448  if (s < 0 && errno == EAFNOSUPPORT) {
449  return;
450  }
451  CHECK_GE(s, 0);
452 
453  try {
454  setupSocket(s, res->ai_family);
455  } catch (...) {
456  closeNoInt(s);
457  throw;
458  }
459 
460  if (res->ai_family == AF_INET6) {
461  int v6only = 1;
462  CHECK(
463  0 ==
464  setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)));
465  }
466 
467  // Bind to the socket
468  if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
470  errno,
471  "failed to bind to async server socket for port ",
472  SocketAddress::getPortFrom(res->ai_addr),
473  " family ",
474  SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
475  }
476 
477 #if __linux__
478  if (noTransparentTls_) {
479  // Ignore return value, errors are ok
480  setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
481  }
482 #endif
483 
484  SocketAddress address;
485  address.setFromLocalAddress(s);
486 
487  sockets_.emplace_back(eventBase_, s, this, address.getFamily());
488  };
489 
490  const int kNumTries = 25;
491  for (int tries = 1; true; tries++) {
492  // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
493  // should return IPv6 first and then IPv4 addresses, but glibc's
494  // getaddrinfo(nullptr) with AI_PASSIVE returns:
495  // - 0.0.0.0 (IPv4-only)
496  // - :: (IPv6+IPv4) in this order
497  // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
498  for (struct addrinfo* res = res0; res; res = res->ai_next) {
499  if (res->ai_family == AF_INET6) {
500  setupAddress(res);
501  }
502  }
503 
504  // If port == 0, then we should try to bind to the same port on ipv4 and
505  // ipv6. So if we did bind to ipv6, figure out that port and use it.
506  if (sockets_.size() == 1 && port == 0) {
507  SocketAddress address;
508  address.setFromLocalAddress(sockets_.back().socket_);
509  snprintf(sport, sizeof(sport), "%u", address.getPort());
510  freeaddrinfo(res0);
511  CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
512  }
513 
514  try {
515  for (struct addrinfo* res = res0; res; res = res->ai_next) {
516  if (res->ai_family != AF_INET6) {
517  setupAddress(res);
518  }
519  }
520  } catch (const std::system_error&) {
521  // If we can't bind to the same port on ipv4 as ipv6 when using
522  // port=0 then we will retry again before giving up after
523  // kNumTries attempts. We do this by closing the sockets that
524  // were opened, then restarting from scratch.
525  if (port == 0 && !sockets_.empty() && tries != kNumTries) {
526  for (const auto& socket : sockets_) {
527  if (socket.socket_ <= 0) {
528  continue;
529  } else if (
530  const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
531  shutdownSocketSet->close(socket.socket_);
532  } else {
533  closeNoInt(socket.socket_);
534  }
535  }
536  sockets_.clear();
537  snprintf(sport, sizeof(sport), "%u", port);
538  freeaddrinfo(res0);
539  CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
540  continue;
541  }
542 
543  throw;
544  }
545 
546  break;
547  }
548 
549  if (sockets_.size() == 0) {
550  throw std::runtime_error("did not bind any async server socket for port");
551  }
552 }
553 
554 void AsyncServerSocket::listen(int backlog) {
555  if (eventBase_) {
557  }
558 
559  // Start listening
560  for (auto& handler : sockets_) {
561  if (fsp::listen(handler.socket_, backlog) == -1) {
562  folly::throwSystemError(errno, "failed to listen on async server socket");
563  }
564  }
565 }
566 
567 void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
568  CHECK(sockets_.size() >= 1);
569  VLOG_IF(2, sockets_.size() > 1)
570  << "Warning: getAddress() called and multiple addresses available ("
571  << sockets_.size() << "). Returning only the first one.";
572 
573  addressReturn->setFromLocalAddress(sockets_[0].socket_);
574 }
575 
576 std::vector<SocketAddress> AsyncServerSocket::getAddresses() const {
577  CHECK(sockets_.size() >= 1);
578  auto tsaVec = std::vector<SocketAddress>(sockets_.size());
579  auto tsaIter = tsaVec.begin();
580  for (const auto& socket : sockets_) {
581  (tsaIter++)->setFromLocalAddress(socket.socket_);
582  };
583  return tsaVec;
584 }
585 
587  AcceptCallback* callback,
588  EventBase* eventBase,
589  uint32_t maxAtOnce) {
590  if (eventBase_) {
592  }
593 
594  // If this is the first accept callback and we are supposed to be accepting,
595  // start accepting once the callback is installed.
596  bool runStartAccepting = accepting_ && callbacks_.empty();
597 
598  callbacks_.emplace_back(callback, eventBase);
599 
600  SCOPE_SUCCESS {
601  // If this is the first accept callback and we are supposed to be accepting,
602  // start accepting.
603  if (runStartAccepting) {
604  startAccepting();
605  }
606  };
607 
608  if (!eventBase) {
609  // Run in AsyncServerSocket's eventbase; notify that we are
610  // starting to accept connections
611  callback->acceptStarted();
612  return;
613  }
614 
615  // Start the remote acceptor.
616  //
617  // It would be nice if we could avoid starting the remote acceptor if
618  // eventBase == eventBase_. However, that would cause issues if
619  // detachEventBase() and attachEventBase() were ever used to change the
620  // primary EventBase for the server socket. Therefore we require the caller
621  // to specify a nullptr EventBase if they want to ensure that the callback is
622  // always invoked in the primary EventBase, and to be able to invoke that
623  // callback more efficiently without having to use a notification queue.
624  RemoteAcceptor* acceptor = nullptr;
625  try {
626  acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
627  acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
628  } catch (...) {
629  callbacks_.pop_back();
630  delete acceptor;
631  throw;
632  }
633  callbacks_.back().consumer = acceptor;
634 }
635 
637  AcceptCallback* callback,
638  EventBase* eventBase) {
639  if (eventBase_) {
641  }
642 
643  // Find the matching AcceptCallback.
644  // We just do a simple linear search; we don't expect removeAcceptCallback()
645  // to be called frequently, and we expect there to only be a small number of
646  // callbacks anyway.
647  std::vector<CallbackInfo>::iterator it = callbacks_.begin();
648  uint32_t n = 0;
649  while (true) {
650  if (it == callbacks_.end()) {
651  throw std::runtime_error(
652  "AsyncServerSocket::removeAcceptCallback(): "
653  "accept callback not found");
654  }
655  if (it->callback == callback &&
656  (it->eventBase == eventBase || eventBase == nullptr)) {
657  break;
658  }
659  ++it;
660  ++n;
661  }
662 
663  // Remove this callback from callbacks_.
664  //
665  // Do this before invoking the acceptStopped() callback, in case
666  // acceptStopped() invokes one of our methods that examines callbacks_.
667  //
668  // Save a copy of the CallbackInfo first.
669  CallbackInfo info(*it);
670  callbacks_.erase(it);
671  if (n < callbackIndex_) {
672  // We removed an element before callbackIndex_. Move callbackIndex_ back
673  // one step, since things after n have been shifted back by 1.
674  --callbackIndex_;
675  } else {
676  // We removed something at or after callbackIndex_.
677  // If we removed the last element and callbackIndex_ was pointing at it,
678  // we need to reset callbackIndex_ to 0.
679  if (callbackIndex_ >= callbacks_.size()) {
680  callbackIndex_ = 0;
681  }
682  }
683 
684  if (info.consumer) {
685  // consumer could be nullptr is we run callbacks in primary event
686  // base
687  DCHECK(info.eventBase);
688  info.consumer->stop(info.eventBase, info.callback);
689  } else {
690  // callback invoked in the primary event base, just call directly
691  DCHECK(info.callback);
692  callback->acceptStopped();
693  }
694 
695  // If we are supposed to be accepting but the last accept callback
696  // was removed, unregister for events until a callback is added.
697  if (accepting_ && callbacks_.empty()) {
698  for (auto& handler : sockets_) {
699  handler.unregisterHandler();
700  }
701  }
702 }
703 
705  if (eventBase_) {
707  }
708 
709  accepting_ = true;
710  if (callbacks_.empty()) {
711  // We can't actually begin accepting if no callbacks are defined.
712  // Wait until a callback is added to start accepting.
713  return;
714  }
715 
716  for (auto& handler : sockets_) {
717  if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
718  throw std::runtime_error("failed to register for accept events");
719  }
720  }
721 }
722 
724  if (eventBase_) {
726  }
727  accepting_ = false;
728  for (auto& handler : sockets_) {
729  handler.unregisterHandler();
730  }
731 
732  // If we were in the accept backoff state, disable the backoff timeout
733  if (backoffTimeout_) {
735  }
736 }
737 
739  int fd = fsp::socket(family, SOCK_STREAM, 0);
740  if (fd == -1) {
741  folly::throwSystemError(errno, "error creating async server socket");
742  }
743 
744  try {
745  setupSocket(fd, family);
746  } catch (...) {
747  closeNoInt(fd);
748  throw;
749  }
750  return fd;
751 }
752 
759  if (!kIsLinux || enable == false) {
760  tosReflect_ = false;
761  return;
762  }
763 
764  for (auto& handler : sockets_) {
765  if (handler.socket_ < 0) {
766  continue;
767  }
768 
769  int val = (enable) ? 1 : 0;
770  int ret = setsockopt(
771  handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val));
772 
773  if (ret == 0) {
774  VLOG(10) << "Enabled SYN save for socket " << handler.socket_;
775  } else {
776  folly::throwSystemError(errno, "failed to enable TOS reflect");
777  }
778  }
779  tosReflect_ = true;
780 }
781 
782 void AsyncServerSocket::setupSocket(int fd, int family) {
783  // Put the socket in non-blocking mode
784  if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
785  folly::throwSystemError(errno, "failed to put socket in non-blocking mode");
786  }
787 
788  // Set reuseaddr to avoid 2MSL delay on server restart
789  int one = 1;
790  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
791  // This isn't a fatal error; just log an error message and continue
792  LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
793  }
794 
795  // Set reuseport to support multiple accept threads
796  int zero = 0;
797  if (reusePortEnabled_ &&
798  setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
799  LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
800  << errnoStr(errno);
801 #ifdef WIN32
802  folly::throwSystemError(errno, "failed to bind to the async server socket");
803 #else
804  SocketAddress address;
805  address.setFromLocalAddress(fd);
807  errno, "failed to bind to async server socket: " + address.describe());
808 #endif
809  }
810 
811  // Set keepalive as desired
812  if (setsockopt(
813  fd,
814  SOL_SOCKET,
815  SO_KEEPALIVE,
816  (keepAliveEnabled_) ? &one : &zero,
817  sizeof(int)) != 0) {
818  LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: "
819  << errnoStr(errno);
820  }
821 
822  // Setup FD_CLOEXEC flag
823  if (closeOnExec_ && (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
824  LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: "
825  << errnoStr(errno);
826  }
827 
828  // Set TCP nodelay if available, MAC OS X Hack
829  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
830 #ifndef TCP_NOPUSH
831  if (family != AF_UNIX) {
832  if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
833  // This isn't a fatal error; just log an error message and continue
834  LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: "
835  << errnoStr(errno);
836  }
837  }
838 #else
839  (void)family; // to avoid unused parameter warning
840 #endif
841 
842 #if FOLLY_ALLOW_TFO
843  if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
844  // This isn't a fatal error; just log an error message and continue
845  LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
846  << folly::errnoStr(errno);
847  }
848 #endif
849 
850  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
851  shutdownSocketSet->add(fd);
852  }
853 }
854 
856  uint16_t /* events */,
857  int fd,
858  sa_family_t addressFamily) noexcept {
859  assert(!callbacks_.empty());
860  DestructorGuard dg(this);
861 
862  // Only accept up to maxAcceptAtOnce_ connections at a time,
863  // to avoid starving other I/O handlers using this EventBase.
864  for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
865  SocketAddress address;
866 
867  sockaddr_storage addrStorage;
868  socklen_t addrLen = sizeof(addrStorage);
869  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
870 
871  // In some cases, accept() doesn't seem to update these correctly.
872  saddr->sa_family = addressFamily;
873  if (addressFamily == AF_UNIX) {
874  addrLen = sizeof(struct sockaddr_un);
875  }
876 
877  // Accept a new client socket
878 #ifdef SOCK_NONBLOCK
879  int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
880 #else
881  int clientSocket = accept(fd, saddr, &addrLen);
882 #endif
883 
884  address.setFromSockaddr(saddr, addrLen);
885 
886  if (clientSocket >= 0 && connectionEventCallback_) {
887  connectionEventCallback_->onConnectionAccepted(clientSocket, address);
888  }
889 
890  // Connection accepted, get the SYN packet from the client if
891  // TOS reflect is enabled
892  if (kIsLinux && clientSocket >= 0 && tosReflect_) {
893  std::array<uint32_t, 64> buffer;
894  socklen_t len = sizeof(buffer);
895  int ret =
896  getsockopt(clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len);
897 
898  if (ret == 0) {
899  uint32_t tosWord = folly::Endian::big(buffer[0]);
900  if (addressFamily == AF_INET6) {
901  tosWord = (tosWord & 0x0FC00000) >> 20;
902  ret = setsockopt(
903  clientSocket,
904  IPPROTO_IPV6,
905  IPV6_TCLASS,
906  &tosWord,
907  sizeof(tosWord));
908  } else if (addressFamily == AF_INET) {
909  tosWord = (tosWord & 0x00FC0000) >> 16;
910  ret = setsockopt(
911  clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord));
912  }
913 
914  if (ret != 0) {
915  LOG(ERROR) << "Unable to set TOS for accepted socket "
916  << clientSocket;
917  }
918  } else {
919  LOG(ERROR) << "Unable to get SYN packet for accepted socket "
920  << clientSocket;
921  }
922  }
923 
924  std::chrono::time_point<std::chrono::steady_clock> nowMs =
926  auto timeSinceLastAccept = std::max<int64_t>(
927  0,
928  nowMs.time_since_epoch().count() -
929  lastAccepTimestamp_.time_since_epoch().count());
930  lastAccepTimestamp_ = nowMs;
931  if (acceptRate_ < 1) {
932  acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
933  if (acceptRate_ >= 1) {
934  acceptRate_ = 1;
935  } else if (rand() > acceptRate_ * RAND_MAX) {
937  if (clientSocket >= 0) {
938  closeNoInt(clientSocket);
941  clientSocket, address);
942  }
943  }
944  continue;
945  }
946  }
947 
948  if (clientSocket < 0) {
949  if (errno == EAGAIN) {
950  // No more sockets to accept right now.
951  // Check for this code first, since it's the most common.
952  return;
953  } else if (errno == EMFILE || errno == ENFILE) {
954  // We're out of file descriptors. Perhaps we're accepting connections
955  // too quickly. Pause accepting briefly to back off and give the server
956  // a chance to recover.
957  LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
958  "back-off state";
959  enterBackoff();
960 
961  // Dispatch the error message
962  dispatchError("accept() failed", errno);
963  } else {
964  dispatchError("accept() failed", errno);
965  }
968  }
969  return;
970  }
971 
972 #ifndef SOCK_NONBLOCK
973  // Explicitly set the new connection to non-blocking mode
974  if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
975  closeNoInt(clientSocket);
977  "failed to set accepted socket to non-blocking mode", errno);
979  connectionEventCallback_->onConnectionDropped(clientSocket, address);
980  }
981  return;
982  }
983 #endif
984 
985  // Inform the callback about the new connection
986  dispatchSocket(clientSocket, std::move(address));
987 
988  // If we aren't accepting any more, break out of the loop
989  if (!accepting_ || callbacks_.empty()) {
990  break;
991  }
992  }
993 }
994 
996  uint32_t startingIndex = callbackIndex_;
997 
998  // Short circuit if the callback is in the primary EventBase thread
999 
1001  if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1002  info->callback->connectionAccepted(socket, address);
1003  return;
1004  }
1005 
1006  const SocketAddress addr(address);
1007  // Create a message to send over the notification queue
1008  QueueMessage msg;
1010  msg.address = std::move(address);
1011  msg.fd = socket;
1012 
1013  // Loop until we find a free queue to write to
1014  while (true) {
1015  if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
1018  socket, addr);
1019  }
1020  // Success! return.
1021  return;
1022  }
1023 
1024  // We couldn't add to queue. Fall through to below
1025 
1027  if (acceptRateAdjustSpeed_ > 0) {
1028  // aggressively decrease accept rate when in trouble
1029  static const double kAcceptRateDecreaseSpeed = 0.1;
1030  acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
1031  }
1032 
1033  if (callbackIndex_ == startingIndex) {
1034  // The notification queue was full
1035  // We can't really do anything at this point other than close the socket.
1036  //
1037  // This should only happen if a user's service is behaving extremely
1038  // badly and none of the EventBase threads are looping fast enough to
1039  // process the incoming connections. If the service is overloaded, it
1040  // should use pauseAccepting() to temporarily back off accepting new
1041  // connections, before they reach the point where their threads can't
1042  // even accept new messages.
1043  LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
1044  << " all accept callback queues are full";
1045  closeNoInt(socket);
1048  }
1049  return;
1050  }
1051 
1052  info = nextCallback();
1053  }
1054 }
1055 
1056 void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) {
1057  uint32_t startingIndex = callbackIndex_;
1059 
1060  // Create a message to send over the notification queue
1061  QueueMessage msg;
1063  msg.err = errnoValue;
1064  msg.msg = std::move(msgstr);
1065 
1066  while (true) {
1067  // Short circuit if the callback is in the primary EventBase thread
1068  if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1069  std::runtime_error ex(
1070  std::string(msgstr) + folly::to<std::string>(errnoValue));
1071  info->callback->acceptError(ex);
1072  return;
1073  }
1074 
1075  if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
1076  return;
1077  }
1078  // Fall through and try another callback
1079 
1080  if (callbackIndex_ == startingIndex) {
1081  // The notification queues for all of the callbacks were full.
1082  // We can't really do anything at this point.
1083  LOG_EVERY_N(ERROR, 100)
1084  << "failed to dispatch accept error: all accept"
1085  << " callback queues are full: error msg: " << msg.msg << ": "
1086  << errnoValue;
1087  return;
1088  }
1089  info = nextCallback();
1090  }
1091 }
1092 
1094  // If this is the first time we have entered the backoff state,
1095  // allocate backoffTimeout_.
1096  if (backoffTimeout_ == nullptr) {
1097  try {
1098  backoffTimeout_ = new BackoffTimeout(this);
1099  } catch (const std::bad_alloc&) {
1100  // Man, we couldn't even allocate the timer to re-enable accepts.
1101  // We must be in pretty bad shape. Don't pause accepting for now,
1102  // since we won't be able to re-enable ourselves later.
1103  LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
1104  << " timer; unable to temporarly pause accepting";
1107  }
1108  return;
1109  }
1110  }
1111 
1112  // For now, we simply pause accepting for 1 second.
1113  //
1114  // We could add some smarter backoff calculation here in the future. (e.g.,
1115  // start sleeping for longer if we keep hitting the backoff frequently.)
1116  // Typically the user needs to figure out why the server is overloaded and
1117  // fix it in some other way, though. The backoff timer is just a simple
1118  // mechanism to try and give the connection processing code a little bit of
1119  // breathing room to catch up, and to avoid just spinning and failing to
1120  // accept over and over again.
1121  const uint32_t timeoutMS = 1000;
1122  if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
1123  LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
1124  << "unable to temporarly pause accepting";
1127  }
1128  return;
1129  }
1130 
1131  // The backoff timer is scheduled to re-enable accepts.
1132  // Go ahead and disable accepts for now. We leave accepting_ set to true,
1133  // since that tracks the desired state requested by the user.
1134  for (auto& handler : sockets_) {
1135  handler.unregisterHandler();
1136  }
1139  }
1140 }
1141 
1143  // accepting_ should still be true.
1144  // If pauseAccepting() was called while in the backoff state it will cancel
1145  // the backoff timeout.
1146  assert(accepting_);
1147  // We can't be detached from the EventBase without being paused
1148  assert(eventBase_ != nullptr);
1150 
1151  // If all of the callbacks were removed, we shouldn't re-enable accepts
1152  if (callbacks_.empty()) {
1155  }
1156  return;
1157  }
1158 
1159  // Register the handler.
1160  for (auto& handler : sockets_) {
1161  if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
1162  // We're hosed. We could just re-schedule backoffTimeout_ to
1163  // re-try again after a little bit. However, we don't want to
1164  // loop retrying forever if we can't re-enable accepts. Just
1165  // abort the entire program in this state; things are really bad
1166  // and restarting the entire server is probably the best remedy.
1167  LOG(ERROR)
1168  << "failed to re-enable AsyncServerSocket accepts after backoff; "
1169  << "crashing now";
1170  abort();
1171  }
1172  }
1175  }
1176 }
1177 
1178 } // namespace folly
chrono
Definition: CMakeCache.txt:563
std::vector< uint8_t > buffer(kBufferSize+16)
void setMaxReadAtOnce(uint32_t maxAtOnce)
AsyncServerSocket(AsyncServerSocket &&)=delete
def info()
Definition: deadlock.py:447
*than *hazptr_holder h
Definition: Hazptr.h:116
#define SO_ZEROCOPY
Definition: NetOps.h:51
int shutdownNoInt(NetworkSocket fd, int how)
Definition: FileUtil.cpp:98
virtual void handlerReady(uint16_t events, int socket, sa_family_t family) noexcept
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
void messageAvailable(QueueMessage &&message) noexceptoverride
static const uint32_t kDefaultMaxMessagesInQueue
std::vector< ServerEventHandler > sockets_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
virtual void onConnectionDequeuedByAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
socklen_t getActualSize() const
int stopAccepting(int shutdownFlags=-1)
STL namespace.
void setFromSockaddr(const struct sockaddr *address)
double val
Definition: String.cpp:273
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
virtual void bind(const SocketAddress &address)
uint16_t getPort() const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::string describe() const
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void listen(int backlog)
#define nullptr
Definition: http_parser.c:41
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue)
void dispatchError(const char *msg, int errnoValue)
void handler(int, siginfo_t *, void *)
void attachEventBase(EventBase *eventBase)
void useExistingSockets(const std::vector< int > &fds)
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
static int getPortFrom(const struct sockaddr *address)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
sa_family_t getFamily() const
static T big(T x)
Definition: Bits.h:259
std::vector< int > pendingCloseSockets_
EventBase * eventBase_
Definition: EventHandler.h:198
virtual void connectionAccepted(int fd, const SocketAddress &clientAddr) noexcept=0
AsyncServerSocket::UniquePtr socket_
CallbackInfo * nextCallback()
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
NotificationQueue< QueueMessage > queue_
ShutdownSocketSet shutdownSocketSet
static const uint32_t kDefaultMaxAcceptAtOnce
#define TCP_SAVE_SYN
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
int setCloseOnExec(int fd, int value)
static constexpr bool msgErrQueueSupported
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wNewSS)
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
#define TCP_SAVED_SYN
int tfo_enable(int, size_t)
fbstring errnoStr(int err)
Definition: String.cpp:463
NotificationQueue< QueueMessage > * getQueue()
SocketAddress getAddress() const
void dispatchSocket(int socket, SocketAddress &&address)
static const uint32_t kDefaultCallbackAcceptAtOnce
ConnectionEventCallback * connectionEventCallback_
virtual void onConnectionEnqueuedForAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
std::vector< SocketAddress > getAddresses() const
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
void setFromLocalAddress(int socket)
const char * string
Definition: Conv.cpp:212
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static const char * getFamilyNameFrom(const struct sockaddr *address, const char *defaultResult=nullptr)
static set< string > s
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool scheduleTimeout(uint32_t milliseconds)
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAccepted(const int socket, const SocketAddress &addr) noexcept=0
int errnoValue
Definition: Subprocess.cpp:225
constexpr auto kIsWindows
Definition: Portability.h:367
#define SO_REUSEPORT
void throwSystemError(Args &&...args)
Definition: Exception.h:76
virtual void onConnectionAcceptError(const int err) noexcept=0
void stop(EventBase *eventBase, AcceptCallback *callback)
void setupSocket(int fd, int family)
void setTosReflect(bool enable)
BackoffTimeout * backoffTimeout_
ThreadPoolListHook * addr
virtual void acceptError(const std::exception &ex) noexcept=0
constexpr auto kIsLinux
Definition: Portability.h:361
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
Definition: NetOps.cpp:71
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)