proxygen
ServerBootstrap-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <folly/ExceptionWrapper.h>
19 #include <folly/SharedMutex.h>
25 #include <wangle/channel/Handler.h>
28 #include <wangle/ssl/SSLStats.h>
29 
30 namespace wangle {
31 
32 class AcceptorException : public std::runtime_error {
33  public:
34  enum class ExceptionType {
35  UNKNOWN = 0,
36  TIMED_OUT = 1,
37  DROPPED = 2,
38  ACCEPT_STOPPED = 3,
39  DRAIN_CONN_PCT = 4,
40  DROP_CONN_PCT = 5,
41  FORCE_STOP = 6,
42  INTERNAL_ERROR = 7,
43  };
44 
46  std::runtime_error(""), type_(type), pct_(0.0) {}
47 
49  std::runtime_error(message), type_(type), pct_(0.0) {}
50 
52  double pct) :
53  std::runtime_error(message), type_(type), pct_(pct) {}
54 
56  double getPct() const noexcept { return pct_; }
57 
58  protected:
60  // the percentage of connections to be drained or dropped during the shutdown
61  const double pct_;
62 };
63 
64 template <typename Pipeline>
66  : public Acceptor
67  , public wangle::InboundHandler<AcceptPipelineType> {
68  public:
71  public:
72  explicit ServerConnection(typename Pipeline::Ptr pipeline)
73  : pipeline_(std::move(pipeline)) {
74  pipeline_->setPipelineManager(this);
75  }
76 
77  void timeoutExpired() noexcept override {
78  auto ew = folly::make_exception_wrapper<AcceptorException>(
80  pipeline_->readException(ew);
81  }
82 
83  void describe(std::ostream&) const override {}
84  bool isBusy() const override {
85  return true;
86  }
87  void notifyPendingShutdown() override {}
88  void closeWhenIdle() override {}
89  void dropConnection() override {
90  auto ew = folly::make_exception_wrapper<AcceptorException>(
92  pipeline_->readException(ew);
93  }
94  void dumpConnectionState(uint8_t /* loglevel */) override {}
95 
97  CHECK(p == pipeline_.get());
98  destroy();
99  }
100 
101  void init() {
102  pipeline_->transportActive();
103  }
104 
105  void refreshTimeout() override {
106  resetTimeout();
107  }
108 
109  private:
110  ~ServerConnection() override {
111  pipeline_->setPipelineManager(nullptr);
112  }
114  };
115 
116  explicit ServerAcceptor(
117  std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
118  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
119  const ServerSocketConfig& accConfig)
120  : Acceptor(accConfig),
121  acceptPipelineFactory_(acceptPipelineFactory),
122  childPipelineFactory_(childPipelineFactory) {
123  }
124 
125  void init(folly::AsyncServerSocket* serverSocket,
126  folly::EventBase* eventBase,
127  SSLStats* stats = nullptr) override {
128  Acceptor::init(serverSocket, eventBase, stats);
129 
130  acceptPipeline_ = acceptPipelineFactory_->newPipeline(this);
131 
132  if (childPipelineFactory_) {
133  // This means a custom AcceptPipelineFactory was not passed in via
134  // pipeline() and we're using the DefaultAcceptPipelineFactory.
135  // Add the default inbound handler here.
136  acceptPipeline_->addBack(this);
137  }
138  acceptPipeline_->finalize();
139  }
140 
141  void read(Context*, AcceptPipelineType conn) override {
142  if (conn.type() != typeid(ConnInfo&)) {
143  return;
144  }
145 
146  auto connInfo = boost::get<ConnInfo&>(conn);
147  folly::AsyncTransportWrapper::UniquePtr transport(connInfo.sock);
148 
149  // Setup local and remote addresses
150  auto tInfoPtr = std::make_shared<TransportInfo>(connInfo.tinfo);
151  tInfoPtr->localAddr =
152  std::make_shared<folly::SocketAddress>(accConfig_.bindAddress);
153  transport->getLocalAddress(tInfoPtr->localAddr.get());
154  tInfoPtr->remoteAddr =
155  std::make_shared<folly::SocketAddress>(*connInfo.clientAddr);
156  tInfoPtr->appProtocol =
157  std::make_shared<std::string>(connInfo.nextProtoName);
158 
159  auto pipeline = childPipelineFactory_->newPipeline(
160  std::shared_ptr<folly::AsyncTransportWrapper>(
161  transport.release(), folly::DelayedDestruction::Destructor()));
162  pipeline->setTransportInfo(tInfoPtr);
163  auto connection = new ServerConnection(std::move(pipeline));
164  Acceptor::addConnection(connection);
165  connection->init();
166  }
167 
168  // Null implementation to terminate the call in this handler
169  // and suppress warnings
170  void readEOF(Context*) override {}
172 
173  /* See Acceptor::onNewConnection for details */
175  const folly::SocketAddress* clientAddr,
176  const std::string& nextProtocolName,
177  SecureTransportType secureTransportType,
178  const TransportInfo& tinfo) override {
179  ConnInfo connInfo = {transport.release(), clientAddr, nextProtocolName,
180  secureTransportType, tinfo};
181  acceptPipeline_->read(connInfo);
182  }
183 
184  // notify the acceptors in the acceptPipeline to drain & drop conns
185  void acceptStopped() noexcept override {
186  auto ew = folly::make_exception_wrapper<AcceptorException>(
188  "graceful shutdown timeout");
189 
190  acceptPipeline_->readException(ew);
192  }
193 
194  void drainConnections(double pct) noexcept override {
195  auto ew = folly::make_exception_wrapper<AcceptorException>(
197  "draining some connections", pct);
198 
199  acceptPipeline_->readException(ew);
201  }
202 
203  void dropConnections(double pct) noexcept override {
204  auto ew = folly::make_exception_wrapper<AcceptorException>(
206  "dropping some connections", pct);
207 
208  acceptPipeline_->readException(ew);
210  }
211 
212  void forceStop() noexcept override {
213  auto ew = folly::make_exception_wrapper<AcceptorException>(
215  "hard shutdown timeout");
216 
217  acceptPipeline_->readException(ew);
219  }
220 
221  // UDP thunk
222  void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,
223  const folly::SocketAddress& addr,
224  std::unique_ptr<folly::IOBuf> buf,
225  bool /* truncated */) noexcept override {
226  acceptPipeline_->read(
227  AcceptPipelineType(make_tuple(buf.release(), socket, addr)));
228  }
229 
230  void onConnectionAdded(const ManagedConnection*) override {
231  acceptPipeline_->read(ConnEvent::CONN_ADDED);
232  }
233 
234  void onConnectionRemoved(const ManagedConnection*) override {
235  acceptPipeline_->read(ConnEvent::CONN_REMOVED);
236  }
237 
238  void sslConnectionError(const folly::exception_wrapper& ex) override {
239  acceptPipeline_->readException(ex);
241  }
242 
243  private:
244  std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
245  std::shared_ptr<AcceptPipeline> acceptPipeline_;
246  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
247 };
248 
249 template <typename Pipeline>
251  public:
253  std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
254  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
255  const ServerSocketConfig& accConfig)
256  : acceptPipelineFactory_(acceptPipelineFactory),
257  childPipelineFactory_(childPipelineFactory),
258  accConfig_(accConfig) {}
259 
260  std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) override {
261  auto acceptor = std::make_shared<ServerAcceptor<Pipeline>>(
262  acceptPipelineFactory_, childPipelineFactory_, accConfig_);
263  acceptor->init(nullptr, base, nullptr);
264  return acceptor;
265  }
266 
267  private:
268  std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
269  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
271 };
272 
274  public:
276  std::shared_ptr<AcceptorFactory> acceptorFactory,
278  std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
279  std::shared_ptr<ServerSocketFactory> socketFactory)
280  : workers_(std::make_shared<WorkerMap>())
281  , workersMutex_(std::make_shared<Mutex>())
282  , acceptorFactory_(acceptorFactory)
283  , exec_(exec)
284  , sockets_(sockets)
285  , socketFactory_(socketFactory) {
286  CHECK(exec);
287  }
288 
289  template <typename F>
290  void forEachWorker(F&& f) const;
291 
292  void threadStarted(folly::ThreadPoolExecutor::ThreadHandle*) override;
293  void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override;
295  folly::ThreadPoolExecutor::ThreadHandle* thread) override {
296  threadStarted(thread);
297  }
299  folly::ThreadPoolExecutor::ThreadHandle* thread) override {
300  threadStopped(thread);
301  }
302 
303  private:
305  std::shared_ptr<Acceptor>>;
307 
308  std::shared_ptr<WorkerMap> workers_;
309  std::shared_ptr<Mutex> workersMutex_;
310  std::shared_ptr<AcceptorFactory> acceptorFactory_;
312  std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
314  std::shared_ptr<ServerSocketFactory> socketFactory_;
315 };
316 
317 template <typename F>
319  Mutex::ReadHolder holder(workersMutex_.get());
320  for (const auto& kv : *workers_) {
321  f(kv.second.get());
322  }
323 }
324 
326  public:
328  return AcceptPipeline::create();
329  }
330 };
331 
332 } // namespace wangle
double getPct() const noexcept
std::shared_ptr< WorkerMap > workers_
AcceptPipeline::Ptr newPipeline(Acceptor *) override
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
Definition: test.c:42
ServerAcceptorFactory(std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory, std::shared_ptr< PipelineFactory< Pipeline >> childPipelineFactory, const ServerSocketConfig &accConfig)
void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr) override
auto f
ServerWorkerPool(std::shared_ptr< AcceptorFactory > acceptorFactory, folly::IOThreadPoolExecutor *exec, std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase >>> sockets, std::shared_ptr< ServerSocketFactory > socketFactory)
ServerConnection(typename Pipeline::Ptr pipeline)
SharedMutexImpl< true > SharedMutexReadPriority
Definition: SharedMutex.h:1509
void deletePipeline(wangle::PipelineBase *p) override
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
Definition: Pipeline.h:277
PskType type
void readEOF(Context *) override
void onConnectionRemoved(const ManagedConnection *) override
void dropConnections(double pct) noexceptoverride
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
std::shared_ptr< Mutex > workersMutex_
void sslConnectionError(const folly::exception_wrapper &ex) override
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
void onConnectionAdded(const ManagedConnection *) override
void acceptStopped() noexceptoverride
std::map< folly::ThreadPoolExecutor::ThreadHandle *, std::shared_ptr< Acceptor >> WorkerMap
virtual void dropConnections(double pctToDrop)
Definition: Acceptor.cpp:530
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void drainConnections(double pctToDrain)
Definition: Acceptor.cpp:485
tuple make_tuple()
Definition: gtest-tuple.h:675
void acceptStopped() noexceptoverride
Definition: Acceptor.cpp:441
static void destroy()
std::shared_ptr< Acceptor > newAcceptor(folly::EventBase *base) override
void readException(Context *, folly::exception_wrapper) override
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
void drainConnections(double pct) noexceptoverride
void onDataAvailable(std::shared_ptr< folly::AsyncUDPSocket > socket, const folly::SocketAddress &addr, std::unique_ptr< folly::IOBuf > buf, bool) noexceptoverride
virtual void forceStop()
Definition: Acceptor.cpp:508
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
static Map map(mapCap)
void addConnection(wangle::ManagedConnection *connection)
Definition: Acceptor.cpp:501
void read(Context *, AcceptPipelineType conn) override
void describe(std::ostream &) const override
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
ServerAcceptor(std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory, std::shared_ptr< PipelineFactory< Pipeline >> childPipelineFactory, const ServerSocketConfig &accConfig)
void forceStop() noexceptoverride
AcceptorException(ExceptionType type, const std::string &message)
ExceptionType getType() const noexcept
AcceptorException(ExceptionType type, const std::string &message, double pct)
const char * string
Definition: Conv.cpp:212
std::shared_ptr< ServerSocketFactory > socketFactory_
static Ptr create()
Definition: Pipeline.h:174
const
Definition: upload.py:398
void forEachWorker(F &&f) const
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
virtual void sslConnectionError(const folly::exception_wrapper &ex)
Definition: Acceptor.cpp:422
void threadPreviouslyStarted(folly::ThreadPoolExecutor::ThreadHandle *thread) override
void threadNotYetStopped(folly::ThreadPoolExecutor::ThreadHandle *thread) override
AcceptorException(ExceptionType type)
std::shared_ptr< AcceptPipeline > acceptPipeline_
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
ThreadPoolListHook * addr
std::shared_ptr< AcceptorFactory > acceptorFactory_
void onNewConnection(folly::AsyncTransportWrapper::UniquePtr transport, const folly::SocketAddress *clientAddr, const std::string &nextProtocolName, SecureTransportType secureTransportType, const TransportInfo &tinfo) override
virtual void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr)
Definition: Acceptor.cpp:60