17 #ifndef __STDC_FORMAT_MACROS 18 #define __STDC_FORMAT_MACROS 36 #include <sys/types.h> 43 #define TCP_SAVE_SYN 27 47 #define TCP_SAVED_SYN 28 51 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 55 #endif // FOLLY_HAVE_MSG_ERRQUEUE 63 int old_flags = fcntl(fd, F_GETFD, 0);
73 new_flags = old_flags | FD_CLOEXEC;
75 new_flags = old_flags & ~FD_CLOEXEC;
79 return fcntl(fd, F_SETFD, new_flags);
87 queue_.setMaxQueueSize(maxInQueue);
90 callback_->acceptStarted();
91 this->startConsuming(eventBase, &queue_);
93 throw std::invalid_argument(
94 "unable to start waiting on accept " 95 "notification queue in the specified " 104 callback->acceptStopped();
107 throw std::invalid_argument(
108 "unable to start waiting on accept " 109 "notification queue in the specified " 120 msg.fd, msg.address);
126 std::runtime_error ex(msg.msg);
131 LOG(ERROR) <<
"invalid accept notification message type " 133 std::runtime_error ex(
134 "received invalid accept notification message type");
151 socket_->backoffTimeoutExpired();
180 const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
181 const auto newSS = wNewSS.lock();
196 newSS->add(
h.socket_);
210 VLOG(10) <<
"AsyncServerSocket::stopAccepting " <<
this <<
handler.socket_;
223 for (; !sockets_.empty(); sockets_.pop_back()) {
224 auto&
handler = sockets_.back();
228 }
else if (shutdownFlags >= 0) {
246 std::vector<CallbackInfo> callbacksCopy;
248 for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
249 it != callbacksCopy.end();
253 DCHECK(it->eventBase);
254 it->consumer->stop(it->eventBase, it->callback);
256 DCHECK(it->callback);
257 it->callback->acceptStopped();
280 handler.attachEventBase(eventBase);
301 throw std::invalid_argument(
302 "cannot call useExistingSocket() on a " 303 "AsyncServerSocket that already has a socket");
306 for (
auto fd : fds) {
317 setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS,
nullptr, 0);
323 sockets_.back().changeHandlerFD(fd);
334 bool isExistingSocket) {
335 sockaddr_storage addrStorage;
337 sockaddr* saddr =
reinterpret_cast<sockaddr*
>(&addrStorage);
340 if (!isExistingSocket) {
344 errno,
"failed to bind to async server socket: " + address.
describe());
350 setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS,
nullptr, 0);
355 if (!isExistingSocket) {
361 if (msgErrQueueSupported) {
363 int val = enable ? 1 : 0;
386 throw std::invalid_argument(
387 "Attempted to bind address to socket with " 388 "different address family");
392 throw std::invalid_argument(
"Attempted to bind to multiple fds");
399 const std::vector<IPAddress>& ipAddresses,
401 if (ipAddresses.empty()) {
402 throw std::invalid_argument(
"No ip addresses were provided");
405 throw std::invalid_argument(
406 "Cannot call bind on a AsyncServerSocket " 407 "that already has a socket.");
410 for (
const IPAddress& ipAddress : ipAddresses) {
417 throw std::runtime_error(
418 "did not bind any async server socket for port and addresses");
423 struct addrinfo hints, *res0;
424 char sport[
sizeof(
"65536")];
426 memset(&hints, 0,
sizeof(hints));
427 hints.ai_family = AF_UNSPEC;
428 hints.ai_socktype = SOCK_STREAM;
429 hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
430 snprintf(sport,
sizeof(sport),
"%u", port);
434 constexpr
const char* kWildcardNode =
kIsWindows ?
"" :
nullptr;
435 if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
436 throw std::invalid_argument(
437 "Attempted to bind address to socket with " 445 auto setupAddress = [&](
struct addrinfo* res) {
446 int s =
fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
448 if (s < 0 && errno == EAFNOSUPPORT) {
460 if (res->ai_family == AF_INET6) {
464 setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only,
sizeof(v6only)));
468 if (
fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
471 "failed to bind to async server socket for port ",
480 setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS,
nullptr, 0);
490 const int kNumTries = 25;
491 for (
int tries = 1;
true; tries++) {
498 for (
struct addrinfo* res = res0; res; res = res->ai_next) {
499 if (res->ai_family == AF_INET6) {
506 if (
sockets_.size() == 1 && port == 0) {
509 snprintf(sport,
sizeof(sport),
"%u", address.
getPort());
511 CHECK_EQ(0, getaddrinfo(
nullptr, sport, &hints, &res0));
515 for (
struct addrinfo* res = res0; res; res = res->ai_next) {
516 if (res->ai_family != AF_INET6) {
520 }
catch (
const std::system_error&) {
525 if (port == 0 && !
sockets_.empty() && tries != kNumTries) {
527 if (
socket.socket_ <= 0) {
537 snprintf(sport,
sizeof(sport),
"%u", port);
539 CHECK_EQ(0, getaddrinfo(
nullptr, sport, &hints, &res0));
550 throw std::runtime_error(
"did not bind any async server socket for port");
570 <<
"Warning: getAddress() called and multiple addresses available (" 571 <<
sockets_.size() <<
"). Returning only the first one.";
578 auto tsaVec = std::vector<SocketAddress>(
sockets_.size());
579 auto tsaIter = tsaVec.begin();
581 (tsaIter++)->setFromLocalAddress(
socket.socket_);
603 if (runStartAccepting) {
647 std::vector<CallbackInfo>::iterator it =
callbacks_.begin();
651 throw std::runtime_error(
652 "AsyncServerSocket::removeAcceptCallback(): " 653 "accept callback not found");
655 if (it->callback == callback &&
656 (it->eventBase == eventBase || eventBase ==
nullptr)) {
718 throw std::runtime_error(
"failed to register for accept events");
769 int val = (enable) ? 1 : 0;
774 VLOG(10) <<
"Enabled SYN save for socket " <<
handler.socket_;
784 if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
790 if (
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one)) != 0) {
792 LOG(ERROR) <<
"failed to set SO_REUSEADDR on async server socket " << errno;
799 LOG(ERROR) <<
"failed to set SO_REUSEPORT on async server socket " 807 errno,
"failed to bind to async server socket: " + address.
describe());
818 LOG(ERROR) <<
"failed to set SO_KEEPALIVE on async server socket: " 824 LOG(ERROR) <<
"failed to set FD_CLOEXEC on async server socket: " 831 if (family != AF_UNIX) {
832 if (
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one,
sizeof(one)) != 0) {
834 LOG(ERROR) <<
"failed to set TCP_NODELAY on async server socket: " 845 LOG(
WARNING) <<
"failed to set TCP_FASTOPEN on async server socket: " 858 sa_family_t addressFamily)
noexcept {
867 sockaddr_storage addrStorage;
868 socklen_t addrLen =
sizeof(addrStorage);
869 sockaddr* saddr =
reinterpret_cast<sockaddr*
>(&addrStorage);
872 saddr->sa_family = addressFamily;
873 if (addressFamily == AF_UNIX) {
874 addrLen =
sizeof(
struct sockaddr_un);
879 int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
881 int clientSocket =
accept(fd, saddr, &addrLen);
893 std::array<uint32_t, 64>
buffer;
894 socklen_t len =
sizeof(
buffer);
900 if (addressFamily == AF_INET6) {
901 tosWord = (tosWord & 0x0FC00000) >> 20;
908 }
else if (addressFamily == AF_INET) {
909 tosWord = (tosWord & 0x00FC0000) >> 16;
911 clientSocket, IPPROTO_IP, IP_TOS, &tosWord,
sizeof(tosWord));
915 LOG(ERROR) <<
"Unable to set TOS for accepted socket " 919 LOG(ERROR) <<
"Unable to get SYN packet for accepted socket " 924 std::chrono::time_point<std::chrono::steady_clock> nowMs =
926 auto timeSinceLastAccept = std::max<int64_t>(
928 nowMs.time_since_epoch().count() -
937 if (clientSocket >= 0) {
941 clientSocket, address);
948 if (clientSocket < 0) {
949 if (errno == EAGAIN) {
953 }
else if (errno == EMFILE || errno == ENFILE) {
957 LOG(ERROR) <<
"accept failed: out of file descriptors; entering accept " 972 #ifndef SOCK_NONBLOCK 974 if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
977 "failed to set accepted socket to non-blocking mode", errno);
1029 static const double kAcceptRateDecreaseSpeed = 0.1;
1043 LOG_EVERY_N(ERROR, 100) <<
"failed to dispatch newly accepted socket:" 1044 <<
" all accept callback queues are full";
1069 std::runtime_error ex(
1070 std::string(msgstr) + folly::to<std::string>(errnoValue));
1083 LOG_EVERY_N(ERROR, 100)
1084 <<
"failed to dispatch accept error: all accept" 1085 <<
" callback queues are full: error msg: " << msg.
msg <<
": " 1099 }
catch (
const std::bad_alloc&) {
1103 LOG(ERROR) <<
"failed to allocate AsyncServerSocket backoff" 1104 <<
" timer; unable to temporarly pause accepting";
1123 LOG(ERROR) <<
"failed to schedule AsyncServerSocket backoff timer;" 1124 <<
"unable to temporarly pause accepting";
1168 <<
"failed to re-enable AsyncServerSocket accepts after backoff; "
std::vector< uint8_t > buffer(kBufferSize+16)
void setMaxReadAtOnce(uint32_t maxAtOnce)
AsyncServerSocket(AsyncServerSocket &&)=delete
virtual void onBackoffError() noexcept=0
int shutdownNoInt(NetworkSocket fd, int how)
virtual void handlerReady(uint16_t events, int socket, sa_family_t family) noexcept
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
void messageAvailable(QueueMessage &&message) noexceptoverride
BackoffTimeout(AsyncServerSocket *socket)
static const uint32_t kDefaultMaxMessagesInQueue
uint32_t maxNumMsgsInQueue_
std::vector< ServerEventHandler > sockets_
virtual void onBackoffStarted() noexcept=0
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
void timeoutExpired() noexceptoverride
virtual void onConnectionDequeuedByAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
socklen_t getActualSize() const
int stopAccepting(int shutdownFlags=-1)
void setFromSockaddr(const struct sockaddr *address)
EventBase * getEventBase()
virtual void bind(const SocketAddress &address)
RemoteAcceptor * consumer
—— Concurrent Priority Queue Implementation ——
std::string describe() const
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void acceptStarted() noexcept
int createSocket(int family)
virtual void listen(int backlog)
AcceptCallback * callback
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue)
void dispatchError(const char *msg, int errnoValue)
void handler(int, siginfo_t *, void *)
void attachEventBase(EventBase *eventBase)
void useExistingSockets(const std::vector< int > &fds)
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
static int getPortFrom(const struct sockaddr *address)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
~AsyncServerSocket() override
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
sa_family_t getFamily() const
std::vector< int > pendingCloseSockets_
void useExistingSocket(int fd)
virtual void connectionAccepted(int fd, const SocketAddress &clientAddr) noexcept=0
AsyncServerSocket::UniquePtr socket_
CallbackInfo * nextCallback()
bool runInEventBaseThread(void(*fn)(T *), T *arg)
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
virtual void onBackoffEnded() noexcept=0
AsyncServerSocket * socket_
NotificationQueue< QueueMessage > queue_
ShutdownSocketSet shutdownSocketSet
static const uint32_t kDefaultMaxAcceptAtOnce
NetworkSocket socket(int af, int type, int protocol)
socklen_t getAddress(sockaddr_storage *addr) const
int listen(NetworkSocket s, int backlog)
std::size_t numDroppedConnections_
int setCloseOnExec(int fd, int value)
virtual void startAccepting()
static constexpr bool msgErrQueueSupported
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wNewSS)
void dcheckIsInEventBaseThread() const
int tfo_enable(int, size_t)
virtual void acceptStopped() noexcept
fbstring errnoStr(int err)
NotificationQueue< QueueMessage > * getQueue()
SocketAddress getAddress() const
void backoffTimeoutExpired()
void dispatchSocket(int socket, SocketAddress &&address)
uint32_t tfoMaxQueueSize_
static const uint32_t kDefaultCallbackAcceptAtOnce
ConnectionEventCallback * connectionEventCallback_
virtual void onConnectionEnqueuedForAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
std::vector< SocketAddress > getAddresses() const
uint32_t maxAcceptAtOnce_
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
void setFromLocalAddress(int socket)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
static const char * getFamilyNameFrom(const struct sockaddr *address, const char *defaultResult=nullptr)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool scheduleTimeout(uint32_t milliseconds)
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAccepted(const int socket, const SocketAddress &addr) noexcept=0
constexpr auto kIsWindows
bool setZeroCopy(bool enable)
void throwSystemError(Args &&...args)
virtual void onConnectionAcceptError(const int err) noexcept=0
AcceptCallback * callback_
void stop(EventBase *eventBase, AcceptCallback *callback)
void setupSocket(int fd, int family)
void setTosReflect(bool enable)
BackoffTimeout * backoffTimeout_
ThreadPoolListHook * addr
virtual void acceptError(const std::exception &ex) noexcept=0
double acceptRateAdjustSpeed_
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
void disableTransparentTls()
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)