proxygen
AsyncServerSocket.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/SocketAddress.h>
20 #include <folly/String.h>
29 
30 #include <limits.h>
31 #include <stddef.h>
32 #include <exception>
33 #include <memory>
34 #include <vector>
35 
36 // Due to the way kernel headers are included, this may or may not be defined.
37 // Number pulled from 3.10 kernel headers.
38 #ifndef SO_REUSEPORT
39 #define SO_REUSEPORT 15
40 #endif
41 
42 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS
43 #define SO_NO_TRANSPARENT_TLS 200
44 #endif
45 
46 namespace folly {
47 
66  public:
67  typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
68  // Disallow copy, move, and default construction.
70 
78  public:
79  virtual ~ConnectionEventCallback() = default;
80 
85  virtual void onConnectionAccepted(
86  const int socket,
87  const SocketAddress& addr) noexcept = 0;
88 
93  virtual void onConnectionAcceptError(const int err) noexcept = 0;
94 
99  virtual void onConnectionDropped(
100  const int socket,
101  const SocketAddress& addr) noexcept = 0;
102 
108  const int socket,
109  const SocketAddress& addr) noexcept = 0;
110 
116  const int socket,
117  const SocketAddress& addr) noexcept = 0;
118 
123  virtual void onBackoffStarted() noexcept = 0;
124 
130  virtual void onBackoffEnded() noexcept = 0;
131 
135  virtual void onBackoffError() noexcept = 0;
136  };
137 
139  public:
140  virtual ~AcceptCallback() = default;
141 
158  virtual void connectionAccepted(
159  int fd,
160  const SocketAddress& clientAddr) noexcept = 0;
161 
172  virtual void acceptError(const std::exception& ex) noexcept = 0;
173 
194  virtual void acceptStarted() noexcept {}
195 
204  virtual void acceptStopped() noexcept {}
205  };
206 
218  explicit AsyncServerSocket(EventBase* eventBase = nullptr);
219 
226  static std::shared_ptr<AsyncServerSocket> newSocket(
227  EventBase* evb = nullptr) {
228  return std::shared_ptr<AsyncServerSocket>(
229  new AsyncServerSocket(evb), Destructor());
230  }
231 
232  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
233 
251  void destroy() override;
252 
260  void attachEventBase(EventBase* eventBase);
261 
268  void detachEventBase();
269 
273  EventBase* getEventBase() const override {
274  return eventBase_;
275  }
276 
295  void useExistingSocket(int fd);
296  void useExistingSockets(const std::vector<int>& fds);
297 
301  std::vector<int> getSockets() const {
302  std::vector<int> sockets;
303  for (auto& handler : sockets_) {
304  sockets.push_back(handler.socket_);
305  }
306  return sockets;
307  }
308 
312  int getSocket() const {
313  if (sockets_.size() > 1) {
314  VLOG(2) << "Warning: getSocket can return multiple fds, "
315  << "but getSockets was not called, so only returning the first";
316  }
317  if (sockets_.size() == 0) {
318  return -1;
319  } else {
320  return sockets_[0].socket_;
321  }
322  }
323 
324  /* enable zerocopy support for the server sockets - the s = accept sockets
325  * inherit it
326  */
327  bool setZeroCopy(bool enable);
328 
336  virtual void bind(const SocketAddress& address);
337 
345  virtual void bind(const std::vector<IPAddress>& ipAddresses, uint16_t port);
346 
354  virtual void bind(uint16_t port);
355 
361  void getAddress(SocketAddress* addressReturn) const override;
362 
369  SocketAddress ret;
370  getAddress(&ret);
371  return ret;
372  }
373 
379  std::vector<SocketAddress> getAddresses() const;
380 
401  virtual void listen(int backlog);
402 
436  virtual void addAcceptCallback(
437  AcceptCallback* callback,
438  EventBase* eventBase,
439  uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
440 
469  void removeAcceptCallback(AcceptCallback* callback, EventBase* eventBase);
470 
487  virtual void startAccepting();
488 
504  void pauseAccepting();
505 
519  int stopAccepting(int shutdownFlags = -1);
520 
526  return maxAcceptAtOnce_;
527  }
528 
543  void setMaxAcceptAtOnce(uint32_t numConns) {
544  maxAcceptAtOnce_ = numConns;
545  }
546 
552  return maxNumMsgsInQueue_;
553  }
554 
563  maxNumMsgsInQueue_ = num;
564  }
565 
569  double getAcceptRateAdjustSpeed() const {
570  return acceptRateAdjustSpeed_;
571  }
572 
576  void setAcceptRateAdjustSpeed(double speed) {
577  acceptRateAdjustSpeed_ = speed;
578  }
579 
583  void setTosReflect(bool enable);
584 
585  bool getTosReflect() {
586  return tosReflect_;
587  }
588 
592  std::size_t getNumDroppedConnections() const {
593  return numDroppedConnections_;
594  }
595 
605  if (eventBase_) {
607  }
608  int64_t numMsgs = 0;
609  for (const auto& callback : callbacks_) {
610  if (callback.consumer) {
611  numMsgs += callback.consumer->getQueue()->size();
612  }
613  }
614  return numMsgs;
615  }
616 
626  void setKeepAliveEnabled(bool enabled) {
627  keepAliveEnabled_ = enabled;
628 
629  for (auto& handler : sockets_) {
630  if (handler.socket_ < 0) {
631  continue;
632  }
633 
634  int val = (enabled) ? 1 : 0;
635  if (setsockopt(
636  handler.socket_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) !=
637  0) {
638  LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s"
639  << errnoStr(errno);
640  }
641  }
642  }
643 
647  bool getKeepAliveEnabled() const {
648  return keepAliveEnabled_;
649  }
650 
655  void setReusePortEnabled(bool enabled) {
656  reusePortEnabled_ = enabled;
657 
658  for (auto& handler : sockets_) {
659  if (handler.socket_ < 0) {
660  continue;
661  }
662 
663  int val = (enabled) ? 1 : 0;
664  if (setsockopt(
665  handler.socket_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) !=
666  0) {
667  LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
668  << errno;
669  folly::throwSystemError(errno, "failed to bind to async server socket");
670  }
671  }
672  }
673 
677  bool getReusePortEnabled_() const {
678  return reusePortEnabled_;
679  }
680 
685  void setCloseOnExec(bool closeOnExec) {
686  closeOnExec_ = closeOnExec;
687  }
688 
692  bool getCloseOnExec() const {
693  return closeOnExec_;
694  }
695 
699  void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) {
700  tfo_ = enabled;
701  tfoMaxQueueSize_ = maxTFOQueueSize;
702  }
703 
708  noTransparentTls_ = true;
709  }
710 
714  bool getAccepting() const {
715  return accepting_;
716  }
717 
722  ConnectionEventCallback* const connectionEventCallback) {
723  connectionEventCallback_ = connectionEventCallback;
724  }
725 
731  }
732 
733  protected:
739  ~AsyncServerSocket() override;
740 
741  private:
742  enum class MessageType { MSG_NEW_CONN = 0, MSG_ERROR = 1 };
743 
744  struct QueueMessage {
746  int fd;
747  int err;
750  };
751 
761  class RemoteAcceptor : private NotificationQueue<QueueMessage>::Consumer {
762  public:
763  explicit RemoteAcceptor(
764  AcceptCallback* callback,
765  ConnectionEventCallback* connectionEventCallback)
766  : callback_(callback),
767  connectionEventCallback_(connectionEventCallback) {}
768 
769  ~RemoteAcceptor() override = default;
770 
771  void start(EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
772  void stop(EventBase* eventBase, AcceptCallback* callback);
773 
774  void messageAvailable(QueueMessage&& message) noexcept override;
775 
777  return &queue_;
778  }
779 
780  private:
783 
785  };
786 
791  struct CallbackInfo {
793  : callback(cb), eventBase(evb), consumer(nullptr) {}
794 
797 
799  };
800 
801  class BackoffTimeout;
802 
803  virtual void
804  handlerReady(uint16_t events, int socket, sa_family_t family) noexcept;
805 
806  int createSocket(int family);
807  void setupSocket(int fd, int family);
808  void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
809  void dispatchSocket(int socket, SocketAddress&& address);
810  void dispatchError(const char* msg, int errnoValue);
811  void enterBackoff();
812  void backoffTimeoutExpired();
813 
816 
817  ++callbackIndex_;
818  if (callbackIndex_ >= callbacks_.size()) {
819  callbackIndex_ = 0;
820  }
821 
822  return info;
823  }
824 
827  EventBase* eventBase,
828  int socket,
830  sa_family_t addressFamily)
831  : EventHandler(eventBase, socket),
832  eventBase_(eventBase),
833  socket_(socket),
834  parent_(parent),
835  addressFamily_(addressFamily) {}
836 
838  : EventHandler(other.eventBase_, other.socket_),
839  eventBase_(other.eventBase_),
840  socket_(other.socket_),
841  parent_(other.parent_),
842  addressFamily_(other.addressFamily_) {}
843 
845  if (this != &other) {
846  eventBase_ = other.eventBase_;
847  socket_ = other.socket_;
848  parent_ = other.parent_;
849  addressFamily_ = other.addressFamily_;
850 
851  detachEventBase();
853  changeHandlerFD(other.socket_);
854  }
855  return *this;
856  }
857 
858  // Inherited from EventHandler
859  void handlerReady(uint16_t events) noexcept override {
860  parent_->handlerReady(events, socket_, addressFamily_);
861  }
862 
864  int socket_;
866  sa_family_t addressFamily_;
867  };
868 
870  std::vector<ServerEventHandler> sockets_;
871  std::vector<int> pendingCloseSockets_;
875  double acceptRateAdjustSpeed_; // 0 to disable auto adjust
876  double acceptRate_;
877  std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
881  std::vector<CallbackInfo> callbacks_;
883  bool reusePortEnabled_{false};
885  bool tfo_{false};
886  bool noTransparentTls_{false};
888  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
890  bool tosReflect_{false};
891 };
892 
893 } // namespace folly
ServerEventHandler(EventBase *eventBase, int socket, AsyncServerSocket *parent, sa_family_t addressFamily)
Definition: test.c:42
AsyncServerSocket(AsyncServerSocket &&)=delete
def info()
Definition: deadlock.py:447
std::size_t getNumDroppedConnections() const
std::vector< int > getSockets() const
void setConnectionEventCallback(ConnectionEventCallback *const connectionEventCallback)
virtual void handlerReady(uint16_t events, int socket, sa_family_t family) noexcept
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
ConnectionEventCallback * getConnectionEventCallback() const
EventBase * getEventBase() const override
static const uint32_t kDefaultMaxMessagesInQueue
void setMaxAcceptAtOnce(uint32_t numConns)
std::vector< ServerEventHandler > sockets_
void setAcceptRateAdjustSpeed(double speed)
uint32_t getMaxNumMessagesInQueue() const
virtual void onConnectionDequeuedByAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
RemoteAcceptor(AcceptCallback *callback, ConnectionEventCallback *connectionEventCallback)
int stopAccepting(int shutdownFlags=-1)
double val
Definition: String.cpp:273
virtual void bind(const SocketAddress &address)
void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void listen(int backlog)
#define nullptr
Definition: http_parser.c:41
int64_t getNumPendingMessagesInQueue() const
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void dispatchError(const char *msg, int errnoValue)
void handler(int, siginfo_t *, void *)
void attachEventBase(EventBase *eventBase)
ServerEventHandler(const ServerEventHandler &other)
void useExistingSockets(const std::vector< int > &fds)
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
static void stop()
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
void setReusePortEnabled(bool enabled)
bool getReusePortEnabled_() const
std::vector< int > pendingCloseSockets_
AsyncServerSocket::UniquePtr socket_
CallbackInfo * nextCallback()
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
NotificationQueue< QueueMessage > queue_
static const uint32_t kDefaultMaxAcceptAtOnce
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
double getAcceptRateAdjustSpeed() const
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wNewSS)
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
auto start
fbstring errnoStr(int err)
Definition: String.cpp:463
NotificationQueue< QueueMessage > * getQueue()
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
SocketAddress getAddress() const
void dispatchSocket(int socket, SocketAddress &&address)
uint32_t getMaxAcceptAtOnce() const
static const uint32_t kDefaultCallbackAcceptAtOnce
ConnectionEventCallback * connectionEventCallback_
virtual void onConnectionEnqueuedForAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
std::vector< SocketAddress > getAddresses() const
void setCloseOnExec(bool closeOnExec)
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
const char * string
Definition: Conv.cpp:212
CallbackInfo(AcceptCallback *cb, EventBase *evb)
void handlerReady(uint16_t events) noexceptoverride
void setKeepAliveEnabled(bool enabled)
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAccepted(const int socket, const SocketAddress &addr) noexcept=0
void setMaxNumMessagesInQueue(uint32_t num)
int errnoValue
Definition: Subprocess.cpp:225
folly::Function< void()> callback_
#define SO_REUSEPORT
void throwSystemError(Args &&...args)
Definition: Exception.h:76
virtual void onConnectionAcceptError(const int err) noexcept=0
void setupSocket(int fd, int family)
void setTosReflect(bool enable)
BackoffTimeout * backoffTimeout_
ThreadPoolListHook * addr
ServerEventHandler & operator=(const ServerEventHandler &other)
folly::Function< void()> parent
Definition: AtFork.cpp:34
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)