39 #define SO_REUSEPORT 15 42 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS 43 #define SO_NO_TRANSPARENT_TLS 200 67 typedef std::unique_ptr<AsyncServerSocket, Destructor>
UniquePtr;
158 virtual void connectionAccepted(
172 virtual void acceptError(
const std::exception& ex) noexcept = 0;
228 return std::shared_ptr<AsyncServerSocket>(
302 std::vector<int> sockets;
304 sockets.push_back(
handler.socket_);
314 VLOG(2) <<
"Warning: getSocket can return multiple fds, " 315 <<
"but getSockets was not called, so only returning the first";
345 virtual void bind(
const std::vector<IPAddress>& ipAddresses,
uint16_t port);
401 virtual void listen(
int backlog);
439 uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
610 if (callback.consumer) {
611 numMsgs += callback.consumer->getQueue()->size();
634 int val = (enabled) ? 1 : 0;
636 handler.socket_, SOL_SOCKET, SO_KEEPALIVE, &val,
sizeof(val)) !=
638 LOG(ERROR) <<
"failed to set SO_KEEPALIVE on async server socket: %s" 663 int val = (enabled) ? 1 : 0;
667 LOG(ERROR) <<
"failed to set SO_REUSEPORT on async server socket " 793 : callback(cb), eventBase(evb), consumer(
nullptr) {}
830 sa_family_t addressFamily)
835 addressFamily_(addressFamily) {}
841 parent_(other.parent_),
842 addressFamily_(other.addressFamily_) {}
845 if (
this != &other) {
853 changeHandlerFD(other.
socket_);
860 parent_->handlerReady(events,
socket_, addressFamily_);
ServerEventHandler(EventBase *eventBase, int socket, AsyncServerSocket *parent, sa_family_t addressFamily)
AsyncServerSocket(AsyncServerSocket &&)=delete
std::size_t getNumDroppedConnections() const
virtual void onBackoffError() noexcept=0
std::vector< int > getSockets() const
void setConnectionEventCallback(ConnectionEventCallback *const connectionEventCallback)
virtual void handlerReady(uint16_t events, int socket, sa_family_t family) noexcept
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
ConnectionEventCallback * getConnectionEventCallback() const
EventBase * getEventBase() const override
static const uint32_t kDefaultMaxMessagesInQueue
uint32_t maxNumMsgsInQueue_
void setMaxAcceptAtOnce(uint32_t numConns)
std::vector< ServerEventHandler > sockets_
virtual void onBackoffStarted() noexcept=0
void setAcceptRateAdjustSpeed(double speed)
uint32_t getMaxNumMessagesInQueue() const
virtual void onConnectionDequeuedByAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
RemoteAcceptor(AcceptCallback *callback, ConnectionEventCallback *connectionEventCallback)
int stopAccepting(int shutdownFlags=-1)
virtual void bind(const SocketAddress &address)
void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize)
RemoteAcceptor * consumer
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void acceptStarted() noexcept
int createSocket(int family)
virtual void listen(int backlog)
AcceptCallback * callback
int64_t getNumPendingMessagesInQueue() const
void bindSocket(int fd, const SocketAddress &address, bool isExistingSocket)
void dispatchError(const char *msg, int errnoValue)
void handler(int, siginfo_t *, void *)
void attachEventBase(EventBase *eventBase)
ServerEventHandler(const ServerEventHandler &other)
void useExistingSockets(const std::vector< int > &fds)
std::chrono::time_point< std::chrono::steady_clock > lastAccepTimestamp_
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
~AsyncServerSocket() override
ConnectionEventCallback * connectionEventCallback_
std::vector< CallbackInfo > callbacks_
void setReusePortEnabled(bool enabled)
bool getReusePortEnabled_() const
std::vector< int > pendingCloseSockets_
void useExistingSocket(int fd)
AsyncServerSocket::UniquePtr socket_
CallbackInfo * nextCallback()
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
virtual void onBackoffEnded() noexcept=0
bool getKeepAliveEnabled() const
NotificationQueue< QueueMessage > queue_
static const uint32_t kDefaultMaxAcceptAtOnce
NetworkSocket socket(int af, int type, int protocol)
double getAcceptRateAdjustSpeed() const
std::size_t numDroppedConnections_
AsyncServerSocket * parent_
virtual ~ConnectionEventCallback()=default
virtual void startAccepting()
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wNewSS)
void dcheckIsInEventBaseThread() const
virtual void acceptStopped() noexcept
fbstring errnoStr(int err)
NotificationQueue< QueueMessage > * getQueue()
sa_family_t addressFamily_
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
SocketAddress getAddress() const
void backoffTimeoutExpired()
void dispatchSocket(int socket, SocketAddress &&address)
uint32_t getMaxAcceptAtOnce() const
uint32_t tfoMaxQueueSize_
static const uint32_t kDefaultCallbackAcceptAtOnce
ConnectionEventCallback * connectionEventCallback_
virtual void onConnectionEnqueuedForAcceptorCallback(const int socket, const SocketAddress &addr) noexcept=0
std::vector< SocketAddress > getAddresses() const
uint32_t maxAcceptAtOnce_
void setCloseOnExec(bool closeOnExec)
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
CallbackInfo(AcceptCallback *cb, EventBase *evb)
void handlerReady(uint16_t events) noexceptoverride
void setKeepAliveEnabled(bool enabled)
virtual void onConnectionDropped(const int socket, const SocketAddress &addr) noexcept=0
virtual void onConnectionAccepted(const int socket, const SocketAddress &addr) noexcept=0
void setMaxNumMessagesInQueue(uint32_t num)
folly::Function< void()> callback_
bool setZeroCopy(bool enable)
void throwSystemError(Args &&...args)
virtual void onConnectionAcceptError(const int err) noexcept=0
AcceptCallback * callback_
void setupSocket(int fd, int family)
void setTosReflect(bool enable)
BackoffTimeout * backoffTimeout_
ThreadPoolListHook * addr
bool getCloseOnExec() const
bool getAccepting() const
ServerEventHandler & operator=(const ServerEventHandler &other)
folly::Function< void()> parent
double acceptRateAdjustSpeed_
void disableTransparentTls()
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)