64 template <
typename Pipeline>
73 : pipeline_(
std::
move(pipeline)) {
74 pipeline_->setPipelineManager(
this);
78 auto ew = folly::make_exception_wrapper<AcceptorException>(
80 pipeline_->readException(ew);
90 auto ew = folly::make_exception_wrapper<AcceptorException>(
92 pipeline_->readException(ew);
97 CHECK(p == pipeline_.get());
102 pipeline_->transportActive();
111 pipeline_->setPipelineManager(
nullptr);
117 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
121 acceptPipelineFactory_(acceptPipelineFactory),
122 childPipelineFactory_(childPipelineFactory) {
127 SSLStats* stats =
nullptr)
override {
130 acceptPipeline_ = acceptPipelineFactory_->newPipeline(
this);
132 if (childPipelineFactory_) {
136 acceptPipeline_->addBack(
this);
138 acceptPipeline_->finalize();
142 if (conn.type() !=
typeid(
ConnInfo&)) {
146 auto connInfo = boost::get<ConnInfo&>(conn);
150 auto tInfoPtr = std::make_shared<TransportInfo>(connInfo.tinfo);
151 tInfoPtr->localAddr =
152 std::make_shared<folly::SocketAddress>(accConfig_.bindAddress);
153 transport->getLocalAddress(tInfoPtr->localAddr.get());
154 tInfoPtr->remoteAddr =
155 std::make_shared<folly::SocketAddress>(*connInfo.clientAddr);
156 tInfoPtr->appProtocol =
157 std::make_shared<std::string>(connInfo.nextProtoName);
159 auto pipeline = childPipelineFactory_->newPipeline(
160 std::shared_ptr<folly::AsyncTransportWrapper>(
162 pipeline->setTransportInfo(tInfoPtr);
179 ConnInfo connInfo = {transport.release(), clientAddr, nextProtocolName,
180 secureTransportType, tinfo};
181 acceptPipeline_->read(connInfo);
186 auto ew = folly::make_exception_wrapper<AcceptorException>(
188 "graceful shutdown timeout");
190 acceptPipeline_->readException(ew);
195 auto ew = folly::make_exception_wrapper<AcceptorException>(
197 "draining some connections", pct);
199 acceptPipeline_->readException(ew);
204 auto ew = folly::make_exception_wrapper<AcceptorException>(
206 "dropping some connections", pct);
208 acceptPipeline_->readException(ew);
213 auto ew = folly::make_exception_wrapper<AcceptorException>(
215 "hard shutdown timeout");
217 acceptPipeline_->readException(ew);
224 std::unique_ptr<folly::IOBuf> buf,
226 acceptPipeline_->read(
239 acceptPipeline_->readException(ex);
249 template <
typename Pipeline>
253 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
256 : acceptPipelineFactory_(acceptPipelineFactory),
257 childPipelineFactory_(childPipelineFactory),
258 accConfig_(accConfig) {}
261 auto acceptor = std::make_shared<ServerAcceptor<Pipeline>>(
262 acceptPipelineFactory_, childPipelineFactory_, accConfig_);
263 acceptor->init(
nullptr, base,
nullptr);
276 std::shared_ptr<AcceptorFactory> acceptorFactory,
278 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
279 std::shared_ptr<ServerSocketFactory> socketFactory)
281 , workersMutex_(
std::make_shared<
Mutex>())
282 , acceptorFactory_(acceptorFactory)
285 , socketFactory_(socketFactory) {
289 template <
typename F>
290 void forEachWorker(F&&
f)
const;
296 threadStarted(thread);
300 threadStopped(thread);
305 std::shared_ptr<Acceptor>>;
312 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
317 template <
typename F>
319 Mutex::ReadHolder holder(workersMutex_.get());
320 for (
const auto& kv : *workers_) {
double getPct() const noexcept
std::shared_ptr< WorkerMap > workers_
AcceptPipeline::Ptr newPipeline(Acceptor *) override
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
ServerAcceptorFactory(std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory, std::shared_ptr< PipelineFactory< Pipeline >> childPipelineFactory, const ServerSocketConfig &accConfig)
void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr) override
ServerWorkerPool(std::shared_ptr< AcceptorFactory > acceptorFactory, folly::IOThreadPoolExecutor *exec, std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase >>> sockets, std::shared_ptr< ServerSocketFactory > socketFactory)
void dumpConnectionState(uint8_t) override
ServerConnection(typename Pipeline::Ptr pipeline)
SharedMutexImpl< true > SharedMutexReadPriority
void deletePipeline(wangle::PipelineBase *p) override
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
void readEOF(Context *) override
void onConnectionRemoved(const ManagedConnection *) override
void dropConnections(double pct) noexceptoverride
constexpr detail::Map< Move > move
std::shared_ptr< Mutex > workersMutex_
void sslConnectionError(const folly::exception_wrapper &ex) override
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
void onConnectionAdded(const ManagedConnection *) override
void acceptStopped() noexceptoverride
std::map< folly::ThreadPoolExecutor::ThreadHandle *, std::shared_ptr< Acceptor >> WorkerMap
virtual void dropConnections(double pctToDrop)
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void drainConnections(double pctToDrain)
void notifyPendingShutdown() override
void acceptStopped() noexceptoverride
void timeoutExpired() noexceptoverride
std::shared_ptr< Acceptor > newAcceptor(folly::EventBase *base) override
void readException(Context *, folly::exception_wrapper) override
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
const ExceptionType type_
void drainConnections(double pct) noexceptoverride
void onDataAvailable(std::shared_ptr< folly::AsyncUDPSocket > socket, const folly::SocketAddress &addr, std::unique_ptr< folly::IOBuf > buf, bool) noexceptoverride
void dropConnection() override
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
void addConnection(wangle::ManagedConnection *connection)
void read(Context *, AcceptPipelineType conn) override
void describe(std::ostream &) const override
NetworkSocket socket(int af, int type, int protocol)
ServerAcceptor(std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory, std::shared_ptr< PipelineFactory< Pipeline >> childPipelineFactory, const ServerSocketConfig &accConfig)
void forceStop() noexceptoverride
AcceptorException(ExceptionType type, const std::string &message)
ServerSocketConfig accConfig_
~ServerConnection() override
ExceptionType getType() const noexcept
AcceptorException(ExceptionType type, const std::string &message, double pct)
std::shared_ptr< ServerSocketFactory > socketFactory_
void forEachWorker(F &&f) const
void refreshTimeout() override
std::shared_ptr< Pipeline > Ptr
virtual void sslConnectionError(const folly::exception_wrapper &ex)
void threadPreviouslyStarted(folly::ThreadPoolExecutor::ThreadHandle *thread) override
void threadNotYetStopped(folly::ThreadPoolExecutor::ThreadHandle *thread) override
void closeWhenIdle() override
AcceptorException(ExceptionType type)
std::shared_ptr< AcceptPipeline > acceptPipeline_
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
ThreadPoolListHook * addr
std::shared_ptr< AcceptorFactory > acceptorFactory_
void onNewConnection(folly::AsyncTransportWrapper::UniquePtr transport, const folly::SocketAddress *clientAddr, const std::string &nextProtocolName, SecureTransportType secureTransportType, const TransportInfo &tinfo) override
virtual void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr)
bool isBusy() const override