proxygen
folly::AsyncServerSocket Class Reference

#include <AsyncServerSocket.h>

Inheritance diagram for folly::AsyncServerSocket:
folly::DelayedDestruction folly::AsyncSocketBase folly::DelayedDestructionBase folly::test::MockAsyncServerSocket

Classes

class  AcceptCallback
 
class  BackoffTimeout
 
struct  CallbackInfo
 
class  ConnectionEventCallback
 
struct  QueueMessage
 
class  RemoteAcceptor
 
struct  ServerEventHandler
 

Public Types

typedef std::unique_ptr< AsyncServerSocket, DestructorUniquePtr
 

Public Member Functions

 AsyncServerSocket (AsyncServerSocket &&)=delete
 
 AsyncServerSocket (EventBase *eventBase=nullptr)
 
void setShutdownSocketSet (const std::weak_ptr< ShutdownSocketSet > &wNewSS)
 
void destroy () override
 
void attachEventBase (EventBase *eventBase)
 
void detachEventBase ()
 
EventBasegetEventBase () const override
 
void useExistingSocket (int fd)
 
void useExistingSockets (const std::vector< int > &fds)
 
std::vector< int > getSockets () const
 
int getSocket () const
 
bool setZeroCopy (bool enable)
 
virtual void bind (const SocketAddress &address)
 
virtual void bind (const std::vector< IPAddress > &ipAddresses, uint16_t port)
 
virtual void bind (uint16_t port)
 
void getAddress (SocketAddress *addressReturn) const override
 
SocketAddress getAddress () const
 
std::vector< SocketAddressgetAddresses () const
 
virtual void listen (int backlog)
 
virtual void addAcceptCallback (AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)
 
void removeAcceptCallback (AcceptCallback *callback, EventBase *eventBase)
 
virtual void startAccepting ()
 
void pauseAccepting ()
 
int stopAccepting (int shutdownFlags=-1)
 
uint32_t getMaxAcceptAtOnce () const
 
void setMaxAcceptAtOnce (uint32_t numConns)
 
uint32_t getMaxNumMessagesInQueue () const
 
void setMaxNumMessagesInQueue (uint32_t num)
 
double getAcceptRateAdjustSpeed () const
 
void setAcceptRateAdjustSpeed (double speed)
 
void setTosReflect (bool enable)
 
bool getTosReflect ()
 
std::size_t getNumDroppedConnections () const
 
int64_t getNumPendingMessagesInQueue () const
 
void setKeepAliveEnabled (bool enabled)
 
bool getKeepAliveEnabled () const
 
void setReusePortEnabled (bool enabled)
 
bool getReusePortEnabled_ () const
 
void setCloseOnExec (bool closeOnExec)
 
bool getCloseOnExec () const
 
void setTFOEnabled (bool enabled, uint32_t maxTFOQueueSize)
 
void disableTransparentTls ()
 
bool getAccepting () const
 
void setConnectionEventCallback (ConnectionEventCallback *const connectionEventCallback)
 
ConnectionEventCallbackgetConnectionEventCallback () const
 
- Public Member Functions inherited from folly::DelayedDestruction
bool getDestroyPending () const
 
- Public Member Functions inherited from folly::DelayedDestructionBase
virtual ~DelayedDestructionBase ()=default
 
- Public Member Functions inherited from folly::AsyncSocketBase
virtual ~AsyncSocketBase ()=default
 

Static Public Member Functions

static std::shared_ptr< AsyncServerSocketnewSocket (EventBase *evb=nullptr)
 

Static Public Attributes

static const uint32_t kDefaultMaxAcceptAtOnce = 30
 
static const uint32_t kDefaultCallbackAcceptAtOnce = 5
 
static const uint32_t kDefaultMaxMessagesInQueue = 1024
 

Protected Member Functions

 ~AsyncServerSocket () override
 
- Protected Member Functions inherited from folly::DelayedDestruction
 ~DelayedDestruction () override=default
 
 DelayedDestruction ()
 
- Protected Member Functions inherited from folly::DelayedDestructionBase
 DelayedDestructionBase ()
 
uint32_t getDestructorGuardCount () const
 

Private Types

enum  MessageType { MessageType::MSG_NEW_CONN = 0, MessageType::MSG_ERROR = 1 }
 

Private Member Functions

virtual void handlerReady (uint16_t events, int socket, sa_family_t family) noexcept
 
int createSocket (int family)
 
void setupSocket (int fd, int family)
 
void bindSocket (int fd, const SocketAddress &address, bool isExistingSocket)
 
void dispatchSocket (int socket, SocketAddress &&address)
 
void dispatchError (const char *msg, int errnoValue)
 
void enterBackoff ()
 
void backoffTimeoutExpired ()
 
CallbackInfonextCallback ()
 

Private Attributes

EventBaseeventBase_
 
std::vector< ServerEventHandlersockets_
 
std::vector< int > pendingCloseSockets_
 
bool accepting_
 
uint32_t maxAcceptAtOnce_
 
uint32_t maxNumMsgsInQueue_
 
double acceptRateAdjustSpeed_
 
double acceptRate_
 
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
 
std::size_t numDroppedConnections_
 
uint32_t callbackIndex_
 
BackoffTimeoutbackoffTimeout_
 
std::vector< CallbackInfocallbacks_
 
bool keepAliveEnabled_
 
bool reusePortEnabled_ {false}
 
bool closeOnExec_
 
bool tfo_ {false}
 
bool noTransparentTls_ {false}
 
uint32_t tfoMaxQueueSize_ {0}
 
std::weak_ptr< ShutdownSocketSetwShutdownSocketSet_
 
ConnectionEventCallbackconnectionEventCallback_ {nullptr}
 
bool tosReflect_ {false}
 

Detailed Description

A listening socket that asynchronously informs a callback whenever a new connection has been accepted.

Unlike most async interfaces that always invoke their callback in the same EventBase thread, AsyncServerSocket is unusual in that it can distribute the callbacks across multiple EventBase threads.

This supports a common use case for network servers to distribute incoming connections across a number of EventBase threads. (Servers typically run with one EventBase thread per CPU.)

Despite being able to invoke callbacks in multiple EventBase threads, AsyncServerSocket still has one "primary" EventBase. Operations that modify the AsyncServerSocket state may only be performed from the primary EventBase thread.

Definition at line 65 of file AsyncServerSocket.h.

Member Typedef Documentation

Definition at line 67 of file AsyncServerSocket.h.

Member Enumeration Documentation

Enumerator
MSG_NEW_CONN 
MSG_ERROR 

Definition at line 742 of file AsyncServerSocket.h.

742 { MSG_NEW_CONN = 0, MSG_ERROR = 1 };

Constructor & Destructor Documentation

folly::AsyncServerSocket::AsyncServerSocket ( AsyncServerSocket &&  )
delete

Referenced by newSocket().

folly::AsyncServerSocket::AsyncServerSocket ( EventBase eventBase = nullptr)
explicit

Create a new AsyncServerSocket with the specified EventBase.

Parameters
eventBaseThe EventBase to use for driving the asynchronous I/O. If this parameter is nullptr, attachEventBase() must be called before this socket can begin accepting connections.

Definition at line 162 of file AsyncServerSocket.cpp.

References disableTransparentTls().

163  : eventBase_(eventBase),
164  accepting_(false),
168  acceptRate_(1),
171  callbackIndex_(0),
172  backoffTimeout_(nullptr),
173  callbacks_(),
174  keepAliveEnabled_(true),
175  closeOnExec_(true) {
177 }
static const uint32_t kDefaultMaxMessagesInQueue
std::chrono::steady_clock::time_point now()
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
std::vector< CallbackInfo > callbacks_
static const uint32_t kDefaultMaxAcceptAtOnce
BackoffTimeout * backoffTimeout_
folly::AsyncServerSocket::~AsyncServerSocket ( )
overrideprotected

Protected destructor.

Invoke destroy() instead to destroy the AsyncServerSocket.

Definition at line 203 of file AsyncServerSocket.cpp.

References callbacks_.

Referenced by getConnectionEventCallback().

203  {
204  assert(callbacks_.empty());
205 }
std::vector< CallbackInfo > callbacks_

Member Function Documentation

void folly::AsyncServerSocket::addAcceptCallback ( AcceptCallback callback,
EventBase eventBase,
uint32_t  maxAtOnce = kDefaultCallbackAcceptAtOnce 
)
virtual

Add an AcceptCallback.

When a new socket is accepted, one of the AcceptCallbacks will be invoked with the new socket. The AcceptCallbacks are invoked in a round-robin fashion. This allows the accepted sockets to be distributed among a pool of threads, each running its own EventBase object. This is a common model, since most asynchronous-style servers typically run one EventBase thread per CPU.

The EventBase object associated with each AcceptCallback must be running its loop. If the EventBase loop is not running, sockets will still be scheduled for the callback, but the callback cannot actually get invoked until the loop runs.

This method must be invoked from the AsyncServerSocket's primary EventBase thread.

Note that startAccepting() must be called on the AsyncServerSocket to cause it to actually start accepting sockets once callbacks have been installed.

Parameters
callbackThe callback to invoke.
eventBaseThe EventBase to use to invoke the callback. This parameter may be nullptr, in which case the callback will be invoked in the AsyncServerSocket's primary EventBase.
maxAtOnceThe maximum number of connections to accept in this callback on a single iteration of the event base loop. This only takes effect when eventBase is non-nullptr. When using a nullptr eventBase for the callback, the setMaxAcceptAtOnce() method controls how many connections the main event base will accept at once.

Definition at line 586 of file AsyncServerSocket.cpp.

References accepting_, folly::AsyncServerSocket::AcceptCallback::acceptStarted(), callbacks_, connectionEventCallback_, folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, maxNumMsgsInQueue_, folly::AsyncServerSocket::RemoteAcceptor::start(), and startAccepting().

Referenced by folly::ZeroCopyTestServer::addCallbackToServerSocket(), getAddress(), wangle::Acceptor::init(), and serverSocketSanityTest().

589  {
590  if (eventBase_) {
592  }
593 
594  // If this is the first accept callback and we are supposed to be accepting,
595  // start accepting once the callback is installed.
596  bool runStartAccepting = accepting_ && callbacks_.empty();
597 
598  callbacks_.emplace_back(callback, eventBase);
599 
600  SCOPE_SUCCESS {
601  // If this is the first accept callback and we are supposed to be accepting,
602  // start accepting.
603  if (runStartAccepting) {
604  startAccepting();
605  }
606  };
607 
608  if (!eventBase) {
609  // Run in AsyncServerSocket's eventbase; notify that we are
610  // starting to accept connections
611  callback->acceptStarted();
612  return;
613  }
614 
615  // Start the remote acceptor.
616  //
617  // It would be nice if we could avoid starting the remote acceptor if
618  // eventBase == eventBase_. However, that would cause issues if
619  // detachEventBase() and attachEventBase() were ever used to change the
620  // primary EventBase for the server socket. Therefore we require the caller
621  // to specify a nullptr EventBase if they want to ensure that the callback is
622  // always invoked in the primary EventBase, and to be able to invoke that
623  // callback more efficiently without having to use a notification queue.
624  RemoteAcceptor* acceptor = nullptr;
625  try {
626  acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
627  acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
628  } catch (...) {
629  callbacks_.pop_back();
630  delete acceptor;
631  throw;
632  }
633  callbacks_.back().consumer = acceptor;
634 }
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void folly::AsyncServerSocket::attachEventBase ( EventBase eventBase)

Attach this AsyncServerSocket to its primary EventBase.

This may only be called if the AsyncServerSocket is not already attached to a EventBase. The AsyncServerSocket must be attached to a EventBase before it can begin accepting connections.

Definition at line 274 of file AsyncServerSocket.cpp.

References folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), and sockets_.

Referenced by newSocket(), and folly::AsyncServerSocket::ServerEventHandler::operator=().

274  {
275  assert(eventBase_ == nullptr);
276  eventBase->dcheckIsInEventBaseThread();
277 
278  eventBase_ = eventBase;
279  for (auto& handler : sockets_) {
280  handler.attachEventBase(eventBase);
281  }
282 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
void folly::AsyncServerSocket::backoffTimeoutExpired ( )
private

Definition at line 1142 of file AsyncServerSocket.cpp.

References accepting_, callbacks_, connectionEventCallback_, folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), folly::AsyncServerSocket::ConnectionEventCallback::onBackoffEnded(), folly::EventHandler::PERSIST, folly::EventHandler::READ, and sockets_.

1142  {
1143  // accepting_ should still be true.
1144  // If pauseAccepting() was called while in the backoff state it will cancel
1145  // the backoff timeout.
1146  assert(accepting_);
1147  // We can't be detached from the EventBase without being paused
1148  assert(eventBase_ != nullptr);
1150 
1151  // If all of the callbacks were removed, we shouldn't re-enable accepts
1152  if (callbacks_.empty()) {
1155  }
1156  return;
1157  }
1158 
1159  // Register the handler.
1160  for (auto& handler : sockets_) {
1161  if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
1162  // We're hosed. We could just re-schedule backoffTimeout_ to
1163  // re-try again after a little bit. However, we don't want to
1164  // loop retrying forever if we can't re-enable accepts. Just
1165  // abort the entire program in this state; things are really bad
1166  // and restarting the entire server is probably the best remedy.
1167  LOG(ERROR)
1168  << "failed to re-enable AsyncServerSocket accepts after backoff; "
1169  << "crashing now";
1170  abort();
1171  }
1172  }
1175  }
1176 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void folly::AsyncServerSocket::bind ( const SocketAddress address)
virtual

Bind to the specified address.

This must be called from the primary EventBase thread.

Throws TTransportException on error.

Definition at line 372 of file AsyncServerSocket.cpp.

References bindSocket(), createSocket(), folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, folly::SocketAddress::getFamily(), and sockets_.

Referenced by getSocket().

372  {
373  if (eventBase_) {
375  }
376 
377  // useExistingSocket() may have been called to initialize socket_ already.
378  // However, in the normal case we need to create a new socket now.
379  // Don't set socket_ yet, so that socket_ will remain uninitialized if an
380  // error occurs.
381  int fd;
382  if (sockets_.size() == 0) {
383  fd = createSocket(address.getFamily());
384  } else if (sockets_.size() == 1) {
385  if (address.getFamily() != sockets_[0].addressFamily_) {
386  throw std::invalid_argument(
387  "Attempted to bind address to socket with "
388  "different address family");
389  }
390  fd = sockets_[0].socket_;
391  } else {
392  throw std::invalid_argument("Attempted to bind to multiple fds");
393  }
394 
395  bindSocket(fd, address, !sockets_.empty());
396 }
std::vector< ServerEventHandler > sockets_
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void folly::AsyncServerSocket::bind ( const std::vector< IPAddress > &  ipAddresses,
uint16_t  port 
)
virtual

Bind to the specified port for the specified addresses.

This must be called from the primary EventBase thread.

Throws TTransportException on error.

Definition at line 398 of file AsyncServerSocket.cpp.

References bindSocket(), createSocket(), and sockets_.

400  {
401  if (ipAddresses.empty()) {
402  throw std::invalid_argument("No ip addresses were provided");
403  }
404  if (!sockets_.empty()) {
405  throw std::invalid_argument(
406  "Cannot call bind on a AsyncServerSocket "
407  "that already has a socket.");
408  }
409 
410  for (const IPAddress& ipAddress : ipAddresses) {
411  SocketAddress address(ipAddress.toFullyQualified(), port);
412  int fd = createSocket(address.getFamily());
413 
414  bindSocket(fd, address, false);
415  }
416  if (sockets_.size() == 0) {
417  throw std::runtime_error(
418  "did not bind any async server socket for port and addresses");
419  }
420 }
std::vector< ServerEventHandler > sockets_
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void folly::AsyncServerSocket::bind ( uint16_t  port)
virtual

Bind to the specified port.

This must be called from the primary EventBase thread.

Throws TTransportException on error.

Definition at line 422 of file AsyncServerSocket.cpp.

References folly::netops::bind(), folly::ShutdownSocketSet::close(), folly::closeNoInt(), eventBase_, folly::SocketAddress::getFamily(), folly::SocketAddress::getFamilyNameFrom(), folly::SocketAddress::getPort(), folly::SocketAddress::getPortFrom(), folly::kIsWindows, noTransparentTls_, s, SCOPE_EXIT, folly::SocketAddress::setFromLocalAddress(), folly::netops::setsockopt(), setupSocket(), folly::test::shutdownSocketSet, folly::netops::socket(), sockets_, folly::throwSystemError(), and wShutdownSocketSet_.

422  {
423  struct addrinfo hints, *res0;
424  char sport[sizeof("65536")];
425 
426  memset(&hints, 0, sizeof(hints));
427  hints.ai_family = AF_UNSPEC;
428  hints.ai_socktype = SOCK_STREAM;
429  hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
430  snprintf(sport, sizeof(sport), "%u", port);
431 
432  // On Windows the value we need to pass to bind to all available
433  // addresses is an empty string. Everywhere else, it's nullptr.
434  constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
435  if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
436  throw std::invalid_argument(
437  "Attempted to bind address to socket with "
438  "bad getaddrinfo");
439  }
440 
441  SCOPE_EXIT {
442  freeaddrinfo(res0);
443  };
444 
445  auto setupAddress = [&](struct addrinfo* res) {
446  int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
447  // IPv6/IPv4 may not be supported by the kernel
448  if (s < 0 && errno == EAFNOSUPPORT) {
449  return;
450  }
451  CHECK_GE(s, 0);
452 
453  try {
454  setupSocket(s, res->ai_family);
455  } catch (...) {
456  closeNoInt(s);
457  throw;
458  }
459 
460  if (res->ai_family == AF_INET6) {
461  int v6only = 1;
462  CHECK(
463  0 ==
464  setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)));
465  }
466 
467  // Bind to the socket
468  if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
470  errno,
471  "failed to bind to async server socket for port ",
472  SocketAddress::getPortFrom(res->ai_addr),
473  " family ",
474  SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
475  }
476 
477 #if __linux__
478  if (noTransparentTls_) {
479  // Ignore return value, errors are ok
480  setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
481  }
482 #endif
483 
484  SocketAddress address;
485  address.setFromLocalAddress(s);
486 
487  sockets_.emplace_back(eventBase_, s, this, address.getFamily());
488  };
489 
490  const int kNumTries = 25;
491  for (int tries = 1; true; tries++) {
492  // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
493  // should return IPv6 first and then IPv4 addresses, but glibc's
494  // getaddrinfo(nullptr) with AI_PASSIVE returns:
495  // - 0.0.0.0 (IPv4-only)
496  // - :: (IPv6+IPv4) in this order
497  // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
498  for (struct addrinfo* res = res0; res; res = res->ai_next) {
499  if (res->ai_family == AF_INET6) {
500  setupAddress(res);
501  }
502  }
503 
504  // If port == 0, then we should try to bind to the same port on ipv4 and
505  // ipv6. So if we did bind to ipv6, figure out that port and use it.
506  if (sockets_.size() == 1 && port == 0) {
507  SocketAddress address;
508  address.setFromLocalAddress(sockets_.back().socket_);
509  snprintf(sport, sizeof(sport), "%u", address.getPort());
510  freeaddrinfo(res0);
511  CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
512  }
513 
514  try {
515  for (struct addrinfo* res = res0; res; res = res->ai_next) {
516  if (res->ai_family != AF_INET6) {
517  setupAddress(res);
518  }
519  }
520  } catch (const std::system_error&) {
521  // If we can't bind to the same port on ipv4 as ipv6 when using
522  // port=0 then we will retry again before giving up after
523  // kNumTries attempts. We do this by closing the sockets that
524  // were opened, then restarting from scratch.
525  if (port == 0 && !sockets_.empty() && tries != kNumTries) {
526  for (const auto& socket : sockets_) {
527  if (socket.socket_ <= 0) {
528  continue;
529  } else if (
530  const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
531  shutdownSocketSet->close(socket.socket_);
532  } else {
533  closeNoInt(socket.socket_);
534  }
535  }
536  sockets_.clear();
537  snprintf(sport, sizeof(sport), "%u", port);
538  freeaddrinfo(res0);
539  CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
540  continue;
541  }
542 
543  throw;
544  }
545 
546  break;
547  }
548 
549  if (sockets_.size() == 0) {
550  throw std::runtime_error("did not bind any async server socket for port");
551  }
552 }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
std::vector< ServerEventHandler > sockets_
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
static int getPortFrom(const struct sockaddr *address)
ShutdownSocketSet shutdownSocketSet
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static const char * getFamilyNameFrom(const struct sockaddr *address, const char *defaultResult=nullptr)
static set< string > s
constexpr auto kIsWindows
Definition: Portability.h:367
void throwSystemError(Args &&...args)
Definition: Exception.h:76
void setupSocket(int fd, int family)
void folly::AsyncServerSocket::bindSocket ( int  fd,
const SocketAddress address,
bool  isExistingSocket 
)
private

Definition at line 331 of file AsyncServerSocket.cpp.

References folly::netops::bind(), folly::closeNoInt(), folly::SocketAddress::describe(), eventBase_, folly::SocketAddress::getActualSize(), folly::SocketAddress::getAddress(), folly::SocketAddress::getFamily(), noTransparentTls_, folly::netops::setsockopt(), sockets_, and folly::throwSystemError().

Referenced by bind().

334  {
335  sockaddr_storage addrStorage;
336  address.getAddress(&addrStorage);
337  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
338 
339  if (fsp::bind(fd, saddr, address.getActualSize()) != 0) {
340  if (!isExistingSocket) {
341  closeNoInt(fd);
342  }
344  errno, "failed to bind to async server socket: " + address.describe());
345  }
346 
347 #if __linux__
348  if (noTransparentTls_) {
349  // Ignore return value, errors are ok
350  setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
351  }
352 #endif
353 
354  // If we just created this socket, update the EventHandler and set socket_
355  if (!isExistingSocket) {
356  sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
357  }
358 }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
std::vector< ServerEventHandler > sockets_
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
void throwSystemError(Args &&...args)
Definition: Exception.h:76
int folly::AsyncServerSocket::createSocket ( int  family)
private

Definition at line 738 of file AsyncServerSocket.cpp.

References folly::closeNoInt(), setupSocket(), folly::netops::socket(), and folly::throwSystemError().

Referenced by bind().

738  {
739  int fd = fsp::socket(family, SOCK_STREAM, 0);
740  if (fd == -1) {
741  folly::throwSystemError(errno, "error creating async server socket");
742  }
743 
744  try {
745  setupSocket(fd, family);
746  } catch (...) {
747  closeNoInt(fd);
748  throw;
749  }
750  return fd;
751 }
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
void throwSystemError(Args &&...args)
Definition: Exception.h:76
void setupSocket(int fd, int family)
void folly::AsyncServerSocket::destroy ( )
overridevirtual

Destroy the socket.

AsyncServerSocket::destroy() must be called to destroy the socket. The normal destructor is private, and should not be invoked directly. This prevents callers from deleting a AsyncServerSocket while it is invoking a callback.

destroy() must be invoked from the socket's primary EventBase thread.

If there are AcceptCallbacks still installed when destroy() is called, acceptStopped() will be called on these callbacks to notify them that accepting has stopped. Accept callbacks being driven by other EventBase threads may continue to receive new accept callbacks for a brief period of time after destroy() returns. They will not receive any more callback invocations once acceptStopped() is invoked.

Reimplemented from folly::DelayedDestruction.

Definition at line 264 of file AsyncServerSocket.cpp.

References folly::closeNoInt(), folly::DelayedDestruction::destroy(), pendingCloseSockets_, s, and stopAccepting().

Referenced by newSocket(), and wangle::AsyncServerSocketFactory::ThreadSafeDestructor::operator()().

264  {
265  stopAccepting();
266  for (auto s : pendingCloseSockets_) {
267  closeNoInt(s);
268  }
269  // Then call DelayedDestruction::destroy() to take care of
270  // whether or not we need immediate or delayed destruction
272 }
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
int stopAccepting(int shutdownFlags=-1)
std::vector< int > pendingCloseSockets_
static set< string > s
void folly::AsyncServerSocket::detachEventBase ( )

Detach the AsyncServerSocket from its primary EventBase.

detachEventBase() may only be called if the AsyncServerSocket is not currently accepting connections.

Definition at line 284 of file AsyncServerSocket.cpp.

References accepting_, folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), and sockets_.

Referenced by newSocket(), and folly::AsyncServerSocket::ServerEventHandler::operator=().

284  {
285  assert(eventBase_ != nullptr);
287  assert(!accepting_);
288 
289  eventBase_ = nullptr;
290  for (auto& handler : sockets_) {
291  handler.detachEventBase();
292  }
293 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void folly::AsyncServerSocket::disableTransparentTls ( )
inline

Do not attempt the transparent TLS handshake

Definition at line 707 of file AsyncServerSocket.h.

References noTransparentTls_.

Referenced by AsyncServerSocket().

707  {
708  noTransparentTls_ = true;
709  }
void folly::AsyncServerSocket::dispatchError ( const char *  msg,
int  errnoValue 
)
private

Definition at line 1056 of file AsyncServerSocket.cpp.

References folly::AsyncServerSocket::AcceptCallback::acceptError(), folly::AsyncServerSocket::CallbackInfo::callback, callbackIndex_, folly::AsyncServerSocket::CallbackInfo::consumer, folly::AsyncServerSocket::QueueMessage::err, errnoValue, folly::AsyncServerSocket::CallbackInfo::eventBase, folly::AsyncServerSocket::RemoteAcceptor::getQueue(), deadlock::info(), folly::gen::move, folly::AsyncServerSocket::QueueMessage::msg, MSG_ERROR, nextCallback(), string, folly::AsyncServerSocket::QueueMessage::type, and uint32_t.

Referenced by handlerReady().

1056  {
1057  uint32_t startingIndex = callbackIndex_;
1058  CallbackInfo* info = nextCallback();
1059 
1060  // Create a message to send over the notification queue
1061  QueueMessage msg;
1062  msg.type = MessageType::MSG_ERROR;
1063  msg.err = errnoValue;
1064  msg.msg = std::move(msgstr);
1065 
1066  while (true) {
1067  // Short circuit if the callback is in the primary EventBase thread
1068  if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1069  std::runtime_error ex(
1070  std::string(msgstr) + folly::to<std::string>(errnoValue));
1071  info->callback->acceptError(ex);
1072  return;
1073  }
1074 
1075  if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
1076  return;
1077  }
1078  // Fall through and try another callback
1079 
1080  if (callbackIndex_ == startingIndex) {
1081  // The notification queues for all of the callbacks were full.
1082  // We can't really do anything at this point.
1083  LOG_EVERY_N(ERROR, 100)
1084  << "failed to dispatch accept error: all accept"
1085  << " callback queues are full: error msg: " << msg.msg << ": "
1086  << errnoValue;
1087  return;
1088  }
1089  info = nextCallback();
1090  }
1091 }
def info()
Definition: deadlock.py:447
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
CallbackInfo * nextCallback()
const char * string
Definition: Conv.cpp:212
int errnoValue
Definition: Subprocess.cpp:225
void folly::AsyncServerSocket::dispatchSocket ( int  socket,
SocketAddress &&  address 
)
private

Definition at line 995 of file AsyncServerSocket.cpp.

References acceptRate_, acceptRateAdjustSpeed_, addr, folly::AsyncServerSocket::QueueMessage::address, folly::AsyncServerSocket::CallbackInfo::callback, callbackIndex_, folly::closeNoInt(), folly::AsyncServerSocket::AcceptCallback::connectionAccepted(), connectionEventCallback_, folly::AsyncServerSocket::CallbackInfo::consumer, folly::AsyncServerSocket::CallbackInfo::eventBase, folly::AsyncServerSocket::QueueMessage::fd, folly::AsyncServerSocket::RemoteAcceptor::getQueue(), deadlock::info(), folly::gen::move, MSG_NEW_CONN, nextCallback(), numDroppedConnections_, folly::AsyncServerSocket::ConnectionEventCallback::onConnectionDropped(), folly::AsyncServerSocket::ConnectionEventCallback::onConnectionEnqueuedForAcceptorCallback(), folly::netops::socket(), folly::AsyncServerSocket::QueueMessage::type, and uint32_t.

Referenced by handlerReady().

995  {
996  uint32_t startingIndex = callbackIndex_;
997 
998  // Short circuit if the callback is in the primary EventBase thread
999 
1000  CallbackInfo* info = nextCallback();
1001  if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1002  info->callback->connectionAccepted(socket, address);
1003  return;
1004  }
1005 
1006  const SocketAddress addr(address);
1007  // Create a message to send over the notification queue
1008  QueueMessage msg;
1009  msg.type = MessageType::MSG_NEW_CONN;
1010  msg.address = std::move(address);
1011  msg.fd = socket;
1012 
1013  // Loop until we find a free queue to write to
1014  while (true) {
1015  if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
1018  socket, addr);
1019  }
1020  // Success! return.
1021  return;
1022  }
1023 
1024  // We couldn't add to queue. Fall through to below
1025 
1027  if (acceptRateAdjustSpeed_ > 0) {
1028  // aggressively decrease accept rate when in trouble
1029  static const double kAcceptRateDecreaseSpeed = 0.1;
1030  acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
1031  }
1032 
1033  if (callbackIndex_ == startingIndex) {
1034  // The notification queue was full
1035  // We can't really do anything at this point other than close the socket.
1036  //
1037  // This should only happen if a user's service is behaving extremely
1038  // badly and none of the EventBase threads are looping fast enough to
1039  // process the incoming connections. If the service is overloaded, it
1040  // should use pauseAccepting() to temporarily back off accepting new
1041  // connections, before they reach the point where their threads can't
1042  // even accept new messages.
1043  LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
1044  << " all accept callback queues are full";
1045  closeNoInt(socket);
1048  }
1049  return;
1050  }
1051 
1052  info = nextCallback();
1053  }
1054 }
def info()
Definition: deadlock.py:447
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ConnectionEventCallback * connectionEventCallback_
CallbackInfo * nextCallback()
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
virtual void onConnectionEnqueuedForAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
ThreadPoolListHook * addr
void folly::AsyncServerSocket::enterBackoff ( )
private

Definition at line 1093 of file AsyncServerSocket.cpp.

References backoffTimeout_, connectionEventCallback_, handler(), folly::AsyncServerSocket::ConnectionEventCallback::onBackoffError(), folly::AsyncServerSocket::ConnectionEventCallback::onBackoffStarted(), folly::AsyncTimeout::scheduleTimeout(), sockets_, and uint32_t.

Referenced by handlerReady().

1093  {
1094  // If this is the first time we have entered the backoff state,
1095  // allocate backoffTimeout_.
1096  if (backoffTimeout_ == nullptr) {
1097  try {
1098  backoffTimeout_ = new BackoffTimeout(this);
1099  } catch (const std::bad_alloc&) {
1100  // Man, we couldn't even allocate the timer to re-enable accepts.
1101  // We must be in pretty bad shape. Don't pause accepting for now,
1102  // since we won't be able to re-enable ourselves later.
1103  LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
1104  << " timer; unable to temporarly pause accepting";
1107  }
1108  return;
1109  }
1110  }
1111 
1112  // For now, we simply pause accepting for 1 second.
1113  //
1114  // We could add some smarter backoff calculation here in the future. (e.g.,
1115  // start sleeping for longer if we keep hitting the backoff frequently.)
1116  // Typically the user needs to figure out why the server is overloaded and
1117  // fix it in some other way, though. The backoff timer is just a simple
1118  // mechanism to try and give the connection processing code a little bit of
1119  // breathing room to catch up, and to avoid just spinning and failing to
1120  // accept over and over again.
1121  const uint32_t timeoutMS = 1000;
1122  if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
1123  LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
1124  << "unable to temporarly pause accepting";
1127  }
1128  return;
1129  }
1130 
1131  // The backoff timer is scheduled to re-enable accepts.
1132  // Go ahead and disable accepts for now. We leave accepting_ set to true,
1133  // since that tracks the desired state requested by the user.
1134  for (auto& handler : sockets_) {
1135  handler.unregisterHandler();
1136  }
1139  }
1140 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
ConnectionEventCallback * connectionEventCallback_
bool scheduleTimeout(uint32_t milliseconds)
BackoffTimeout * backoffTimeout_
bool folly::AsyncServerSocket::getAccepting ( ) const
inline

Get whether or not the socket is accepting new connections

Definition at line 714 of file AsyncServerSocket.h.

References accepting_.

714  {
715  return accepting_;
716  }
double folly::AsyncServerSocket::getAcceptRateAdjustSpeed ( ) const
inline

Get the speed of adjusting connection accept rate.

Definition at line 569 of file AsyncServerSocket.h.

References acceptRateAdjustSpeed_.

569  {
570  return acceptRateAdjustSpeed_;
571  }
void folly::AsyncServerSocket::getAddress ( SocketAddress addressReturn) const
overridevirtual

Get the local address to which the socket is bound.

Throws TTransportException on error.

Implements folly::AsyncSocketBase.

Definition at line 567 of file AsyncServerSocket.cpp.

References folly::SocketAddress::setFromLocalAddress(), socket_, and sockets_.

Referenced by serverSocketSanityTest().

567  {
568  CHECK(sockets_.size() >= 1);
569  VLOG_IF(2, sockets_.size() > 1)
570  << "Warning: getAddress() called and multiple addresses available ("
571  << sockets_.size() << "). Returning only the first one.";
572 
573  addressReturn->setFromLocalAddress(sockets_[0].socket_);
574 }
std::vector< ServerEventHandler > sockets_
AsyncServerSocket::UniquePtr socket_
SocketAddress folly::AsyncServerSocket::getAddress ( ) const
inline

Get the local address to which the socket is bound.

Throws TTransportException on error.

Definition at line 368 of file AsyncServerSocket.h.

References addAcceptCallback(), getAddresses(), listen(), pauseAccepting(), removeAcceptCallback(), startAccepting(), stopAccepting(), and uint32_t.

Referenced by getSocket().

368  {
369  SocketAddress ret;
370  getAddress(&ret);
371  return ret;
372  }
SocketAddress getAddress() const
std::vector< SocketAddress > folly::AsyncServerSocket::getAddresses ( ) const

Get all the local addresses to which the socket is bound.

Throws TTransportException on error.

Definition at line 576 of file AsyncServerSocket.cpp.

References folly::netops::socket(), and sockets_.

Referenced by getAddress().

576  {
577  CHECK(sockets_.size() >= 1);
578  auto tsaVec = std::vector<SocketAddress>(sockets_.size());
579  auto tsaIter = tsaVec.begin();
580  for (const auto& socket : sockets_) {
581  (tsaIter++)->setFromLocalAddress(socket.socket_);
582  };
583  return tsaVec;
584 }
std::vector< ServerEventHandler > sockets_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
bool folly::AsyncServerSocket::getCloseOnExec ( ) const
inline

Get whether or not FD_CLOEXEC is enabled on the server socket.

Definition at line 692 of file AsyncServerSocket.h.

References closeOnExec_.

692  {
693  return closeOnExec_;
694  }
ConnectionEventCallback* folly::AsyncServerSocket::getConnectionEventCallback ( ) const
inline

Get the ConnectionEventCallback

Definition at line 729 of file AsyncServerSocket.h.

References connectionEventCallback_, and ~AsyncServerSocket().

729  {
731  }
ConnectionEventCallback * connectionEventCallback_
EventBase* folly::AsyncServerSocket::getEventBase ( ) const
inlineoverridevirtual

Get the EventBase used by this socket.

Implements folly::AsyncSocketBase.

Definition at line 273 of file AsyncServerSocket.h.

References eventBase_, useExistingSocket(), and useExistingSockets().

Referenced by wangle::AsyncServerSocketFactory::ThreadSafeDestructor::operator()(), and serverSocketSanityTest().

273  {
274  return eventBase_;
275  }
bool folly::AsyncServerSocket::getKeepAliveEnabled ( ) const
inline

Get whether or not SO_KEEPALIVE is enabled on the server socket.

Definition at line 647 of file AsyncServerSocket.h.

References keepAliveEnabled_.

647  {
648  return keepAliveEnabled_;
649  }
uint32_t folly::AsyncServerSocket::getMaxAcceptAtOnce ( ) const
inline

Get the maximum number of connections that will be accepted each time around the event loop.

Definition at line 525 of file AsyncServerSocket.h.

References maxAcceptAtOnce_.

525  {
526  return maxAcceptAtOnce_;
527  }
uint32_t folly::AsyncServerSocket::getMaxNumMessagesInQueue ( ) const
inline

Get the maximum number of unprocessed messages which a NotificationQueue can hold.

Definition at line 551 of file AsyncServerSocket.h.

References maxNumMsgsInQueue_.

551  {
552  return maxNumMsgsInQueue_;
553  }
std::size_t folly::AsyncServerSocket::getNumDroppedConnections ( ) const
inline

Get the number of connections dropped by the AsyncServerSocket

Definition at line 592 of file AsyncServerSocket.h.

References numDroppedConnections_.

592  {
593  return numDroppedConnections_;
594  }
int64_t folly::AsyncServerSocket::getNumPendingMessagesInQueue ( ) const
inline

Get the current number of unprocessed messages in NotificationQueue.

This method must be invoked from the AsyncServerSocket's primary EventBase thread. Use EventBase::runInEventBaseThread() to schedule the operation in the correct EventBase if your code is not in the server socket's primary EventBase.

Definition at line 604 of file AsyncServerSocket.h.

References callbacks_, folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, and int64_t.

604  {
605  if (eventBase_) {
607  }
608  int64_t numMsgs = 0;
609  for (const auto& callback : callbacks_) {
610  if (callback.consumer) {
611  numMsgs += callback.consumer->getQueue()->size();
612  }
613  }
614  return numMsgs;
615  }
std::vector< CallbackInfo > callbacks_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
bool folly::AsyncServerSocket::getReusePortEnabled_ ( ) const
inline

Get whether or not SO_REUSEPORT is enabled on the server socket.

Definition at line 677 of file AsyncServerSocket.h.

References reusePortEnabled_.

677  {
678  return reusePortEnabled_;
679  }
int folly::AsyncServerSocket::getSocket ( ) const
inline

Backwards compatible getSocket, warns if > 1 socket

Definition at line 312 of file AsyncServerSocket.h.

References bind(), getAddress(), setZeroCopy(), sockets_, and uint16_t.

Referenced by setZeroCopy().

312  {
313  if (sockets_.size() > 1) {
314  VLOG(2) << "Warning: getSocket can return multiple fds, "
315  << "but getSockets was not called, so only returning the first";
316  }
317  if (sockets_.size() == 0) {
318  return -1;
319  } else {
320  return sockets_[0].socket_;
321  }
322  }
std::vector< ServerEventHandler > sockets_
std::vector<int> folly::AsyncServerSocket::getSockets ( ) const
inline

Return the underlying file descriptor

Definition at line 301 of file AsyncServerSocket.h.

References handler(), and sockets_.

Referenced by proxygen::HTTPServer::getListenSocket(), and wangle::Acceptor::init().

301  {
302  std::vector<int> sockets;
303  for (auto& handler : sockets_) {
304  sockets.push_back(handler.socket_);
305  }
306  return sockets;
307  }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
bool folly::AsyncServerSocket::getTosReflect ( )
inline

Definition at line 585 of file AsyncServerSocket.h.

References tosReflect_.

585  {
586  return tosReflect_;
587  }
void folly::AsyncServerSocket::handlerReady ( uint16_t  events,
int  socket,
sa_family_t  family 
)
privatevirtualnoexcept

Definition at line 855 of file AsyncServerSocket.cpp.

References folly::netops::accept(), accepting_, acceptRate_, acceptRateAdjustSpeed_, folly::Endian::big(), buffer(), callbacks_, folly::closeNoInt(), connectionEventCallback_, dispatchError(), dispatchSocket(), enterBackoff(), folly::netops::getsockopt(), folly::kIsLinux, lastAccepTimestamp_, maxAcceptAtOnce_, folly::gen::move, now(), numDroppedConnections_, folly::AsyncServerSocket::ConnectionEventCallback::onConnectionAccepted(), folly::AsyncServerSocket::ConnectionEventCallback::onConnectionAcceptError(), folly::AsyncServerSocket::ConnectionEventCallback::onConnectionDropped(), folly::SocketAddress::setFromSockaddr(), folly::netops::setsockopt(), TCP_SAVED_SYN, tosReflect_, and uint32_t.

858  {
859  assert(!callbacks_.empty());
860  DestructorGuard dg(this);
861 
862  // Only accept up to maxAcceptAtOnce_ connections at a time,
863  // to avoid starving other I/O handlers using this EventBase.
864  for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
865  SocketAddress address;
866 
867  sockaddr_storage addrStorage;
868  socklen_t addrLen = sizeof(addrStorage);
869  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
870 
871  // In some cases, accept() doesn't seem to update these correctly.
872  saddr->sa_family = addressFamily;
873  if (addressFamily == AF_UNIX) {
874  addrLen = sizeof(struct sockaddr_un);
875  }
876 
877  // Accept a new client socket
878 #ifdef SOCK_NONBLOCK
879  int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
880 #else
881  int clientSocket = accept(fd, saddr, &addrLen);
882 #endif
883 
884  address.setFromSockaddr(saddr, addrLen);
885 
886  if (clientSocket >= 0 && connectionEventCallback_) {
887  connectionEventCallback_->onConnectionAccepted(clientSocket, address);
888  }
889 
890  // Connection accepted, get the SYN packet from the client if
891  // TOS reflect is enabled
892  if (kIsLinux && clientSocket >= 0 && tosReflect_) {
893  std::array<uint32_t, 64> buffer;
894  socklen_t len = sizeof(buffer);
895  int ret =
896  getsockopt(clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len);
897 
898  if (ret == 0) {
899  uint32_t tosWord = folly::Endian::big(buffer[0]);
900  if (addressFamily == AF_INET6) {
901  tosWord = (tosWord & 0x0FC00000) >> 20;
902  ret = setsockopt(
903  clientSocket,
904  IPPROTO_IPV6,
905  IPV6_TCLASS,
906  &tosWord,
907  sizeof(tosWord));
908  } else if (addressFamily == AF_INET) {
909  tosWord = (tosWord & 0x00FC0000) >> 16;
910  ret = setsockopt(
911  clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord));
912  }
913 
914  if (ret != 0) {
915  LOG(ERROR) << "Unable to set TOS for accepted socket "
916  << clientSocket;
917  }
918  } else {
919  LOG(ERROR) << "Unable to get SYN packet for accepted socket "
920  << clientSocket;
921  }
922  }
923 
924  std::chrono::time_point<std::chrono::steady_clock> nowMs =
926  auto timeSinceLastAccept = std::max<int64_t>(
927  0,
928  nowMs.time_since_epoch().count() -
929  lastAccepTimestamp_.time_since_epoch().count());
930  lastAccepTimestamp_ = nowMs;
931  if (acceptRate_ < 1) {
932  acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
933  if (acceptRate_ >= 1) {
934  acceptRate_ = 1;
935  } else if (rand() > acceptRate_ * RAND_MAX) {
937  if (clientSocket >= 0) {
938  closeNoInt(clientSocket);
941  clientSocket, address);
942  }
943  }
944  continue;
945  }
946  }
947 
948  if (clientSocket < 0) {
949  if (errno == EAGAIN) {
950  // No more sockets to accept right now.
951  // Check for this code first, since it's the most common.
952  return;
953  } else if (errno == EMFILE || errno == ENFILE) {
954  // We're out of file descriptors. Perhaps we're accepting connections
955  // too quickly. Pause accepting briefly to back off and give the server
956  // a chance to recover.
957  LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
958  "back-off state";
959  enterBackoff();
960 
961  // Dispatch the error message
962  dispatchError("accept() failed", errno);
963  } else {
964  dispatchError("accept() failed", errno);
965  }
968  }
969  return;
970  }
971 
972 #ifndef SOCK_NONBLOCK
973  // Explicitly set the new connection to non-blocking mode
974  if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
975  closeNoInt(clientSocket);
977  "failed to set accepted socket to non-blocking mode", errno);
979  connectionEventCallback_->onConnectionDropped(clientSocket, address);
980  }
981  return;
982  }
983 #endif
984 
985  // Inform the callback about the new connection
986  dispatchSocket(clientSocket, std::move(address));
987 
988  // If we aren't accepting any more, break out of the loop
989  if (!accepting_ || callbacks_.empty()) {
990  break;
991  }
992  }
993 }
std::vector< uint8_t > buffer(kBufferSize+16)
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
void dispatchError(const char *msg, int errnoValue)
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
static T big(T x)
Definition: Bits.h:259
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
#define TCP_SAVED_SYN
void dispatchSocket(int socket, SocketAddress &&address)
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAccepted(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAcceptError(const int err) noexcept=0
constexpr auto kIsLinux
Definition: Portability.h:361
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
Definition: NetOps.cpp:71
void folly::AsyncServerSocket::listen ( int  backlog)
virtual

Begin listening for connections.

This calls listen() with the specified backlog.

Once listen() is invoked the socket will actually be open so that remote clients may establish connections. (Clients that attempt to connect before listen() is called will receive a connection refused error.)

At least one callback must be set and startAccepting() must be called to actually begin notifying the accept callbacks of newly accepted connections. The backlog parameter controls how many connections the kernel will accept and buffer internally while the accept callbacks are paused (or if accepting is enabled but the callbacks cannot keep up).

bind() must be called before calling listen(). listen() must be called from the primary EventBase thread.

Throws TTransportException on error.

Definition at line 554 of file AsyncServerSocket.cpp.

References folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), folly::netops::listen(), sockets_, and folly::throwSystemError().

Referenced by getAddress().

554  {
555  if (eventBase_) {
557  }
558 
559  // Start listening
560  for (auto& handler : sockets_) {
561  if (fsp::listen(handler.socket_, backlog) == -1) {
562  folly::throwSystemError(errno, "failed to listen on async server socket");
563  }
564  }
565 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void throwSystemError(Args &&...args)
Definition: Exception.h:76
static std::shared_ptr<AsyncServerSocket> folly::AsyncServerSocket::newSocket ( EventBase evb = nullptr)
inlinestatic

Helper function to create a shared_ptr<AsyncServerSocket>.

This passes in the correct destructor object, since AsyncServerSocket's destructor is protected and cannot be invoked directly.

Definition at line 226 of file AsyncServerSocket.h.

References AsyncServerSocket(), attachEventBase(), destroy(), detachEventBase(), and setShutdownSocketSet().

Referenced by folly::TestSSLServer::init(), folly::ScopedBoundPort::ScopedBoundPort(), folly::TEST(), and TEST().

227  {
228  return std::shared_ptr<AsyncServerSocket>(
229  new AsyncServerSocket(evb), Destructor());
230  }
AsyncServerSocket(AsyncServerSocket &&)=delete
CallbackInfo* folly::AsyncServerSocket::nextCallback ( )
inlineprivate

Definition at line 814 of file AsyncServerSocket.h.

References callbackIndex_, callbacks_, and deadlock::info().

Referenced by dispatchError(), and dispatchSocket().

814  {
815  CallbackInfo* info = &callbacks_[callbackIndex_];
816 
817  ++callbackIndex_;
818  if (callbackIndex_ >= callbacks_.size()) {
819  callbackIndex_ = 0;
820  }
821 
822  return info;
823  }
def info()
Definition: deadlock.py:447
std::vector< CallbackInfo > callbacks_
void folly::AsyncServerSocket::pauseAccepting ( )

Pause accepting connections.

startAccepting() may be called to resume accepting.

This method may only be called from the primary EventBase thread. If there are AcceptCallbacks being driven by other EventBase threads they may continue to receive callbacks for a short period of time after pauseAccepting() returns.

Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be called on the AcceptCallback objects simply due to a temporary pause. If the server socket is later destroyed while paused, acceptStopped() will be called all of the installed AcceptCallbacks.

Definition at line 723 of file AsyncServerSocket.cpp.

References accepting_, backoffTimeout_, folly::AsyncTimeout::cancelTimeout(), folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), and sockets_.

Referenced by getAddress().

723  {
724  if (eventBase_) {
726  }
727  accepting_ = false;
728  for (auto& handler : sockets_) {
729  handler.unregisterHandler();
730  }
731 
732  // If we were in the accept backoff state, disable the backoff timeout
733  if (backoffTimeout_) {
735  }
736 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
BackoffTimeout * backoffTimeout_
void folly::AsyncServerSocket::removeAcceptCallback ( AcceptCallback callback,
EventBase eventBase 
)

Remove an AcceptCallback.

This allows a single AcceptCallback to be removed from the round-robin pool.

This method must be invoked from the AsyncServerSocket's primary EventBase thread. Use EventBase::runInEventBaseThread() to schedule the operation in the correct EventBase if your code is not in the server socket's primary EventBase.

Given that the accept callback is being driven by a different EventBase, the AcceptCallback may continue to be invoked for a short period of time after removeAcceptCallback() returns in this thread. Once the other EventBase thread receives the notification to stop, it will call acceptStopped() on the callback to inform it that it is fully stopped and will not receive any new sockets.

If the last accept callback is removed while the socket is accepting, the socket will implicitly pause accepting. If a callback is later added, it will resume accepting immediately, without requiring startAccepting() to be invoked.

Parameters
callbackThe callback to uninstall.
eventBaseThe EventBase associated with this callback. This must be the same EventBase that was used when the callback was installed with addAcceptCallback().

Definition at line 636 of file AsyncServerSocket.cpp.

References accepting_, folly::AsyncServerSocket::AcceptCallback::acceptStopped(), folly::AsyncServerSocket::CallbackInfo::callback, callbackIndex_, callbacks_, folly::AsyncServerSocket::CallbackInfo::consumer, folly::EventBase::dcheckIsInEventBaseThread(), folly::AsyncServerSocket::CallbackInfo::eventBase, eventBase_, handler(), deadlock::info(), sockets_, folly::AsyncServerSocket::RemoteAcceptor::stop(), and uint32_t.

Referenced by getAddress(), and serverSocketSanityTest().

638  {
639  if (eventBase_) {
641  }
642 
643  // Find the matching AcceptCallback.
644  // We just do a simple linear search; we don't expect removeAcceptCallback()
645  // to be called frequently, and we expect there to only be a small number of
646  // callbacks anyway.
647  std::vector<CallbackInfo>::iterator it = callbacks_.begin();
648  uint32_t n = 0;
649  while (true) {
650  if (it == callbacks_.end()) {
651  throw std::runtime_error(
652  "AsyncServerSocket::removeAcceptCallback(): "
653  "accept callback not found");
654  }
655  if (it->callback == callback &&
656  (it->eventBase == eventBase || eventBase == nullptr)) {
657  break;
658  }
659  ++it;
660  ++n;
661  }
662 
663  // Remove this callback from callbacks_.
664  //
665  // Do this before invoking the acceptStopped() callback, in case
666  // acceptStopped() invokes one of our methods that examines callbacks_.
667  //
668  // Save a copy of the CallbackInfo first.
669  CallbackInfo info(*it);
670  callbacks_.erase(it);
671  if (n < callbackIndex_) {
672  // We removed an element before callbackIndex_. Move callbackIndex_ back
673  // one step, since things after n have been shifted back by 1.
674  --callbackIndex_;
675  } else {
676  // We removed something at or after callbackIndex_.
677  // If we removed the last element and callbackIndex_ was pointing at it,
678  // we need to reset callbackIndex_ to 0.
679  if (callbackIndex_ >= callbacks_.size()) {
680  callbackIndex_ = 0;
681  }
682  }
683 
684  if (info.consumer) {
685  // consumer could be nullptr is we run callbacks in primary event
686  // base
687  DCHECK(info.eventBase);
688  info.consumer->stop(info.eventBase, info.callback);
689  } else {
690  // callback invoked in the primary event base, just call directly
691  DCHECK(info.callback);
692  callback->acceptStopped();
693  }
694 
695  // If we are supposed to be accepting but the last accept callback
696  // was removed, unregister for events until a callback is added.
697  if (accepting_ && callbacks_.empty()) {
698  for (auto& handler : sockets_) {
699  handler.unregisterHandler();
700  }
701  }
702 }
def info()
Definition: deadlock.py:447
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
std::vector< CallbackInfo > callbacks_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void folly::AsyncServerSocket::setAcceptRateAdjustSpeed ( double  speed)
inline

Set the speed of adjusting connection accept rate.

Definition at line 576 of file AsyncServerSocket.h.

References acceptRateAdjustSpeed_, and setTosReflect().

576  {
577  acceptRateAdjustSpeed_ = speed;
578  }
void folly::AsyncServerSocket::setCloseOnExec ( bool  closeOnExec)
inline

Set whether or not the socket should close during exec() (FD_CLOEXEC). By default, this is enabled

Definition at line 685 of file AsyncServerSocket.h.

References closeOnExec_.

685  {
686  closeOnExec_ = closeOnExec;
687  }
void folly::AsyncServerSocket::setConnectionEventCallback ( ConnectionEventCallback *const  connectionEventCallback)
inline

Set the ConnectionEventCallback

Definition at line 721 of file AsyncServerSocket.h.

References connectionEventCallback_.

722  {
723  connectionEventCallback_ = connectionEventCallback;
724  }
ConnectionEventCallback * connectionEventCallback_
void folly::AsyncServerSocket::setKeepAliveEnabled ( bool  enabled)
inline

Set whether or not SO_KEEPALIVE should be enabled on the server socket (and thus on all subsequently-accepted connections). By default, keepalive is enabled.

Note that TCP keepalive usually only kicks in after the connection has been idle for several hours. Applications should almost always have their own, shorter idle timeout.

Definition at line 626 of file AsyncServerSocket.h.

References folly::errnoStr(), handler(), keepAliveEnabled_, folly::netops::setsockopt(), sockets_, and val.

626  {
627  keepAliveEnabled_ = enabled;
628 
629  for (auto& handler : sockets_) {
630  if (handler.socket_ < 0) {
631  continue;
632  }
633 
634  int val = (enabled) ? 1 : 0;
635  if (setsockopt(
636  handler.socket_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) !=
637  0) {
638  LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s"
639  << errnoStr(errno);
640  }
641  }
642  }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::vector< ServerEventHandler > sockets_
double val
Definition: String.cpp:273
void handler(int, siginfo_t *, void *)
fbstring errnoStr(int err)
Definition: String.cpp:463
void folly::AsyncServerSocket::setMaxAcceptAtOnce ( uint32_t  numConns)
inline

Set the maximum number of connections that will be accepted each time around the event loop.

This provides a very coarse-grained way of controlling how fast the AsyncServerSocket will accept connections. If you find that when your server is overloaded AsyncServerSocket accepts connections more quickly than your code can process them, you can try lowering this number so that fewer connections will be accepted each event loop iteration.

For more explicit control over the accept rate, you can also use pauseAccepting() to temporarily pause accepting when your server is overloaded, and then use startAccepting() later to resume accepting.

Definition at line 543 of file AsyncServerSocket.h.

References maxAcceptAtOnce_.

543  {
544  maxAcceptAtOnce_ = numConns;
545  }
void folly::AsyncServerSocket::setMaxNumMessagesInQueue ( uint32_t  num)
inline

Set the maximum number of unprocessed messages in NotificationQueue. No new message will be sent to that NotificationQueue if there are more than such number of unprocessed messages in that queue.

Only works if called before addAcceptCallback.

Definition at line 562 of file AsyncServerSocket.h.

References maxNumMsgsInQueue_.

562  {
563  maxNumMsgsInQueue_ = num;
564  }
void folly::AsyncServerSocket::setReusePortEnabled ( bool  enabled)
inline

Set whether or not SO_REUSEPORT should be enabled on the server socket, allowing multiple binds to the same port

Definition at line 655 of file AsyncServerSocket.h.

References handler(), reusePortEnabled_, folly::netops::setsockopt(), SO_REUSEPORT, sockets_, folly::throwSystemError(), and val.

655  {
656  reusePortEnabled_ = enabled;
657 
658  for (auto& handler : sockets_) {
659  if (handler.socket_ < 0) {
660  continue;
661  }
662 
663  int val = (enabled) ? 1 : 0;
664  if (setsockopt(
665  handler.socket_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) !=
666  0) {
667  LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
668  << errno;
669  folly::throwSystemError(errno, "failed to bind to async server socket");
670  }
671  }
672  }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::vector< ServerEventHandler > sockets_
double val
Definition: String.cpp:273
void handler(int, siginfo_t *, void *)
#define SO_REUSEPORT
void throwSystemError(Args &&...args)
Definition: Exception.h:76
void folly::AsyncServerSocket::setShutdownSocketSet ( const std::weak_ptr< ShutdownSocketSet > &  wNewSS)

Definition at line 179 of file AsyncServerSocket.cpp.

References h, folly::ShutdownSocketSet::remove(), folly::test::shutdownSocketSet, sockets_, and wShutdownSocketSet_.

Referenced by newSocket().

180  {
181  const auto newSS = wNewSS.lock();
182  const auto shutdownSocketSet = wShutdownSocketSet_.lock();
183 
184  if (shutdownSocketSet == newSS) {
185  return;
186  }
187 
188  if (shutdownSocketSet) {
189  for (auto& h : sockets_) {
190  shutdownSocketSet->remove(h.socket_);
191  }
192  }
193 
194  if (newSS) {
195  for (auto& h : sockets_) {
196  newSS->add(h.socket_);
197  }
198  }
199 
200  wShutdownSocketSet_ = wNewSS;
201 }
*than *hazptr_holder h
Definition: Hazptr.h:116
std::vector< ServerEventHandler > sockets_
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
ShutdownSocketSet shutdownSocketSet
void folly::AsyncServerSocket::setTFOEnabled ( bool  enabled,
uint32_t  maxTFOQueueSize 
)
inline

Tries to enable TFO if the machine supports it.

Definition at line 699 of file AsyncServerSocket.h.

References tfo_, and tfoMaxQueueSize_.

699  {
700  tfo_ = enabled;
701  tfoMaxQueueSize_ = maxTFOQueueSize;
702  }
void folly::AsyncServerSocket::setTosReflect ( bool  enable)

Enable/Disable TOS reflection for the server socket

Enable/Disable TOS reflection for the server socket If enabled, the 'accepted' connections will reflect the TOS derived from the client's connect request

Definition at line 758 of file AsyncServerSocket.cpp.

References handler(), folly::kIsLinux, folly::netops::setsockopt(), sockets_, TCP_SAVE_SYN, folly::throwSystemError(), tosReflect_, and val.

Referenced by setAcceptRateAdjustSpeed().

758  {
759  if (!kIsLinux || enable == false) {
760  tosReflect_ = false;
761  return;
762  }
763 
764  for (auto& handler : sockets_) {
765  if (handler.socket_ < 0) {
766  continue;
767  }
768 
769  int val = (enable) ? 1 : 0;
770  int ret = setsockopt(
771  handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val));
772 
773  if (ret == 0) {
774  VLOG(10) << "Enabled SYN save for socket " << handler.socket_;
775  } else {
776  folly::throwSystemError(errno, "failed to enable TOS reflect");
777  }
778  }
779  tosReflect_ = true;
780 }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::vector< ServerEventHandler > sockets_
double val
Definition: String.cpp:273
void handler(int, siginfo_t *, void *)
#define TCP_SAVE_SYN
void throwSystemError(Args &&...args)
Definition: Exception.h:76
constexpr auto kIsLinux
Definition: Portability.h:361
void folly::AsyncServerSocket::setupSocket ( int  fd,
int  family 
)
private

Definition at line 782 of file AsyncServerSocket.cpp.

References folly::ShutdownSocketSet::add(), closeOnExec_, folly::SocketAddress::describe(), folly::errnoStr(), keepAliveEnabled_, reusePortEnabled_, folly::setCloseOnExec(), folly::SocketAddress::setFromLocalAddress(), folly::netops::setsockopt(), folly::test::shutdownSocketSet, SO_REUSEPORT, tfo_, folly::detail::tfo_enable(), tfoMaxQueueSize_, folly::throwSystemError(), folly::WARNING, and wShutdownSocketSet_.

Referenced by bind(), createSocket(), and useExistingSockets().

782  {
783  // Put the socket in non-blocking mode
784  if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
785  folly::throwSystemError(errno, "failed to put socket in non-blocking mode");
786  }
787 
788  // Set reuseaddr to avoid 2MSL delay on server restart
789  int one = 1;
790  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
791  // This isn't a fatal error; just log an error message and continue
792  LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
793  }
794 
795  // Set reuseport to support multiple accept threads
796  int zero = 0;
797  if (reusePortEnabled_ &&
798  setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
799  LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
800  << errnoStr(errno);
801 #ifdef WIN32
802  folly::throwSystemError(errno, "failed to bind to the async server socket");
803 #else
804  SocketAddress address;
805  address.setFromLocalAddress(fd);
807  errno, "failed to bind to async server socket: " + address.describe());
808 #endif
809  }
810 
811  // Set keepalive as desired
812  if (setsockopt(
813  fd,
814  SOL_SOCKET,
815  SO_KEEPALIVE,
816  (keepAliveEnabled_) ? &one : &zero,
817  sizeof(int)) != 0) {
818  LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: "
819  << errnoStr(errno);
820  }
821 
822  // Setup FD_CLOEXEC flag
823  if (closeOnExec_ && (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
824  LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: "
825  << errnoStr(errno);
826  }
827 
828  // Set TCP nodelay if available, MAC OS X Hack
829  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
830 #ifndef TCP_NOPUSH
831  if (family != AF_UNIX) {
832  if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
833  // This isn't a fatal error; just log an error message and continue
834  LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: "
835  << errnoStr(errno);
836  }
837  }
838 #else
839  (void)family; // to avoid unused parameter warning
840 #endif
841 
842 #if FOLLY_ALLOW_TFO
843  if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
844  // This isn't a fatal error; just log an error message and continue
845  LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
846  << folly::errnoStr(errno);
847  }
848 #endif
849 
850  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
851  shutdownSocketSet->add(fd);
852  }
853 }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
ShutdownSocketSet shutdownSocketSet
int setCloseOnExec(int fd, int value)
int tfo_enable(int, size_t)
fbstring errnoStr(int err)
Definition: String.cpp:463
#define SO_REUSEPORT
void throwSystemError(Args &&...args)
Definition: Exception.h:76
bool folly::AsyncServerSocket::setZeroCopy ( bool  enable)

Definition at line 360 of file AsyncServerSocket.cpp.

References getSocket(), folly::netops::setsockopt(), SO_ZEROCOPY, and val.

Referenced by getSocket().

360  {
361  if (msgErrQueueSupported) {
362  int fd = getSocket();
363  int val = enable ? 1 : 0;
364  int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
365 
366  return (0 == ret);
367  }
368 
369  return false;
370 }
#define SO_ZEROCOPY
Definition: NetOps.h:51
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
double val
Definition: String.cpp:273
static constexpr bool msgErrQueueSupported
void folly::AsyncServerSocket::startAccepting ( )
virtual

Begin accepting connctions on this socket.

bind() and listen() must be called before calling startAccepting().

When a AsyncServerSocket is initially created, it will not begin accepting connections until at least one callback has been added and startAccepting() has been called. startAccepting() can also be used to resume accepting connections after a call to pauseAccepting().

If startAccepting() is called when there are no accept callbacks installed, the socket will not actually begin accepting until an accept callback is added.

This method may only be called from the primary EventBase thread.

Definition at line 704 of file AsyncServerSocket.cpp.

References accepting_, callbacks_, folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), folly::EventHandler::PERSIST, folly::EventHandler::READ, and sockets_.

Referenced by addAcceptCallback(), getAddress(), and serverSocketSanityTest().

704  {
705  if (eventBase_) {
707  }
708 
709  accepting_ = true;
710  if (callbacks_.empty()) {
711  // We can't actually begin accepting if no callbacks are defined.
712  // Wait until a callback is added to start accepting.
713  return;
714  }
715 
716  for (auto& handler : sockets_) {
717  if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
718  throw std::runtime_error("failed to register for accept events");
719  }
720  }
721 }
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
std::vector< CallbackInfo > callbacks_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
int folly::AsyncServerSocket::stopAccepting ( int  shutdownFlags = -1)

Shutdown the listen socket and notify all callbacks that accept has stopped, but don't close the socket. This invokes shutdown(2) with the supplied argument. Passing -1 will close the socket now. Otherwise, the close will be delayed until this object is destroyed.

Only use this if you have reason to pass special flags to shutdown. Otherwise just destroy the socket.

This method has no effect when a ShutdownSocketSet option is used.

Returns the result of shutdown on sockets_[n-1]

Definition at line 207 of file AsyncServerSocket.cpp.

References accepting_, backoffTimeout_, callbacks_, folly::ShutdownSocketSet::close(), folly::closeNoInt(), folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, handler(), pendingCloseSockets_, folly::shutdownNoInt(), folly::test::shutdownSocketSet, sockets_, and wShutdownSocketSet_.

Referenced by destroy(), and getAddress().

207  {
208  int result = 0;
209  for (auto& handler : sockets_) {
210  VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_;
211  }
212  if (eventBase_) {
214  }
215 
216  // When destroy is called, unregister and close the socket immediately.
217  accepting_ = false;
218 
219  // Close the sockets in reverse order as they were opened to avoid
220  // the condition where another process concurrently tries to open
221  // the same port, succeed to bind the first socket but fails on the
222  // second because it hasn't been closed yet.
223  for (; !sockets_.empty(); sockets_.pop_back()) {
224  auto& handler = sockets_.back();
225  handler.unregisterHandler();
226  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
227  shutdownSocketSet->close(handler.socket_);
228  } else if (shutdownFlags >= 0) {
229  result = shutdownNoInt(handler.socket_, shutdownFlags);
230  pendingCloseSockets_.push_back(handler.socket_);
231  } else {
232  closeNoInt(handler.socket_);
233  }
234  }
235 
236  // Destroy the backoff timout. This will cancel it if it is running.
237  delete backoffTimeout_;
238  backoffTimeout_ = nullptr;
239 
240  // Close all of the callback queues to notify them that they are being
241  // destroyed. No one should access the AsyncServerSocket any more once
242  // destroy() is called. However, clear out callbacks_ before invoking the
243  // accept callbacks just in case. This will potentially help us detect the
244  // bug if one of the callbacks calls addAcceptCallback() or
245  // removeAcceptCallback().
246  std::vector<CallbackInfo> callbacksCopy;
247  callbacks_.swap(callbacksCopy);
248  for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
249  it != callbacksCopy.end();
250  ++it) {
251  // consumer may not be set if we are running in primary event base
252  if (it->consumer) {
253  DCHECK(it->eventBase);
254  it->consumer->stop(it->eventBase, it->callback);
255  } else {
256  DCHECK(it->callback);
257  it->callback->acceptStopped();
258  }
259  }
260 
261  return result;
262 }
int shutdownNoInt(NetworkSocket fd, int how)
Definition: FileUtil.cpp:98
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
std::vector< ServerEventHandler > sockets_
void handler(int, siginfo_t *, void *)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
std::vector< CallbackInfo > callbacks_
std::vector< int > pendingCloseSockets_
ShutdownSocketSet shutdownSocketSet
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
BackoffTimeout * backoffTimeout_
void folly::AsyncServerSocket::useExistingSocket ( int  fd)

Create a AsyncServerSocket from an existing socket file descriptor.

useExistingSocket() will cause the AsyncServerSocket to take ownership of the specified file descriptor, and use it to listen for new connections. The AsyncServerSocket will close the file descriptor when it is destroyed.

useExistingSocket() must be called before bind() or listen().

The supplied file descriptor will automatically be put into non-blocking mode. The caller may have already directly called bind() and possibly listen on the file descriptor. If so the caller should skip calling the corresponding AsyncServerSocket::bind() and listen() methods.

On error a TTransportException will be thrown and the caller will retain ownership of the file descriptor.

Definition at line 327 of file AsyncServerSocket.cpp.

References useExistingSockets().

Referenced by getEventBase().

327  {
328  useExistingSockets({fd});
329 }
void useExistingSockets(const std::vector< int > &fds)
void folly::AsyncServerSocket::useExistingSockets ( const std::vector< int > &  fds)

Definition at line 295 of file AsyncServerSocket.cpp.

References folly::EventBase::dcheckIsInEventBaseThread(), eventBase_, folly::SocketAddress::getFamily(), noTransparentTls_, folly::SocketAddress::setFromLocalAddress(), folly::netops::setsockopt(), setupSocket(), and sockets_.

Referenced by getEventBase(), and useExistingSocket().

295  {
296  if (eventBase_) {
298  }
299 
300  if (sockets_.size() > 0) {
301  throw std::invalid_argument(
302  "cannot call useExistingSocket() on a "
303  "AsyncServerSocket that already has a socket");
304  }
305 
306  for (auto fd : fds) {
307  // Set addressFamily_ from this socket.
308  // Note that the socket may not have been bound yet, but
309  // setFromLocalAddress() will still work and get the correct address family.
310  // We will update addressFamily_ again anyway if bind() is called later.
311  SocketAddress address;
312  address.setFromLocalAddress(fd);
313 
314 #if __linux__
315  if (noTransparentTls_) {
316  // Ignore return value, errors are ok
317  setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
318  }
319 #endif
320 
321  setupSocket(fd, address.getFamily());
322  sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
323  sockets_.back().changeHandlerFD(fd);
324  }
325 }
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
std::vector< ServerEventHandler > sockets_
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void setupSocket(int fd, int family)

Member Data Documentation

double folly::AsyncServerSocket::acceptRate_
private

Definition at line 876 of file AsyncServerSocket.h.

Referenced by dispatchSocket(), and handlerReady().

double folly::AsyncServerSocket::acceptRateAdjustSpeed_
private
BackoffTimeout* folly::AsyncServerSocket::backoffTimeout_
private

Definition at line 880 of file AsyncServerSocket.h.

Referenced by enterBackoff(), pauseAccepting(), and stopAccepting().

uint32_t folly::AsyncServerSocket::callbackIndex_
private
bool folly::AsyncServerSocket::closeOnExec_
private

Definition at line 884 of file AsyncServerSocket.h.

Referenced by getCloseOnExec(), setCloseOnExec(), and setupSocket().

const uint32_t folly::AsyncServerSocket::kDefaultCallbackAcceptAtOnce = 5
static

Definition at line 208 of file AsyncServerSocket.h.

const uint32_t folly::AsyncServerSocket::kDefaultMaxAcceptAtOnce = 30
static

Definition at line 207 of file AsyncServerSocket.h.

const uint32_t folly::AsyncServerSocket::kDefaultMaxMessagesInQueue = 1024
static

Definition at line 209 of file AsyncServerSocket.h.

bool folly::AsyncServerSocket::keepAliveEnabled_
private

Definition at line 882 of file AsyncServerSocket.h.

Referenced by getKeepAliveEnabled(), setKeepAliveEnabled(), and setupSocket().

std::chrono::time_point<std::chrono::steady_clock> folly::AsyncServerSocket::lastAccepTimestamp_
private

Definition at line 877 of file AsyncServerSocket.h.

Referenced by handlerReady().

uint32_t folly::AsyncServerSocket::maxAcceptAtOnce_
private

Definition at line 873 of file AsyncServerSocket.h.

Referenced by getMaxAcceptAtOnce(), handlerReady(), and setMaxAcceptAtOnce().

uint32_t folly::AsyncServerSocket::maxNumMsgsInQueue_
private
bool folly::AsyncServerSocket::noTransparentTls_ {false}
private

Definition at line 886 of file AsyncServerSocket.h.

Referenced by bind(), bindSocket(), disableTransparentTls(), and useExistingSockets().

std::size_t folly::AsyncServerSocket::numDroppedConnections_
private

Definition at line 878 of file AsyncServerSocket.h.

Referenced by dispatchSocket(), getNumDroppedConnections(), and handlerReady().

std::vector<int> folly::AsyncServerSocket::pendingCloseSockets_
private

Definition at line 871 of file AsyncServerSocket.h.

Referenced by destroy(), and stopAccepting().

bool folly::AsyncServerSocket::reusePortEnabled_ {false}
private

Definition at line 883 of file AsyncServerSocket.h.

Referenced by getReusePortEnabled_(), setReusePortEnabled(), and setupSocket().

bool folly::AsyncServerSocket::tfo_ {false}
private

Definition at line 885 of file AsyncServerSocket.h.

Referenced by setTFOEnabled(), and setupSocket().

uint32_t folly::AsyncServerSocket::tfoMaxQueueSize_ {0}
private

Definition at line 887 of file AsyncServerSocket.h.

Referenced by setTFOEnabled(), and setupSocket().

bool folly::AsyncServerSocket::tosReflect_ {false}
private

Definition at line 890 of file AsyncServerSocket.h.

Referenced by getTosReflect(), handlerReady(), and setTosReflect().

std::weak_ptr<ShutdownSocketSet> folly::AsyncServerSocket::wShutdownSocketSet_
private

Definition at line 888 of file AsyncServerSocket.h.

Referenced by bind(), setShutdownSocketSet(), setupSocket(), and stopAccepting().


The documentation for this class was generated from the following files: