proxygen
ServerBootstrap.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
22 #include <iostream>
23 #include <thread>
24 
25 namespace wangle {
26 
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server. It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a Pipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline = wangle::DefaultPipeline>
43  public:
44  ServerBootstrap(const ServerBootstrap& that) = delete;
45  ServerBootstrap(ServerBootstrap&& that) = default;
46 
47  ServerBootstrap() = default;
48 
50  stop();
51  join();
52  }
53 
54  /*
55  * Pipeline used to add connections to event bases.
56  * This is used for UDP or for load balancing
57  * TCP connections to IO threads explicitly
58  */
59  ServerBootstrap* pipeline(std::shared_ptr<AcceptPipelineFactory> factory) {
60  acceptPipelineFactory_ = factory;
61  return this;
62  }
63 
65  std::shared_ptr<ServerSocketFactory> factory) {
66  socketFactory_ = factory;
67  return this;
68  }
69 
71  accConfig_ = accConfig;
72  return this;
73  }
74 
75  /*
76  * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your
77  * Acceptor is responsible for managing the connection pool.
78  *
79  * @param childHandler - acceptor factory to call for each IO thread
80  */
81  ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
83  return this;
84  }
85 
86  /*
87  * Set a pipeline factory that will be called for each new connection
88  *
89  * @param factory pipeline factory to use for each new connection
90  */
92  std::shared_ptr<PipelineFactory<Pipeline>> factory) {
93  childPipelineFactory_ = factory;
94  return this;
95  }
96 
97  /*
98  * Set the IO executor. If not set, a default one will be created
99  * with one thread per core.
100  *
101  * @param io_group - io executor to use for IO threads.
102  */
104  std::shared_ptr<folly::IOThreadPoolExecutor> io_group) {
105  return group(nullptr, io_group);
106  }
107 
108  /*
109  * Set the acceptor executor, and IO executor.
110  *
111  * If no acceptor executor is set, a single thread will be created for accepts
112  * If no IO executor is set, a default of one thread per core will be created
113  *
114  * @param group - acceptor executor to use for acceptor threads.
115  * @param io_group - io executor to use for IO threads.
116  */
118  std::shared_ptr<folly::IOThreadPoolExecutor> accept_group,
119  std::shared_ptr<folly::IOThreadPoolExecutor> io_group) {
120  if (!accept_group) {
121  accept_group = std::make_shared<folly::IOThreadPoolExecutor>(
122  1, std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
123  }
124  if (!io_group) {
125  auto threads = std::thread::hardware_concurrency();
126  if (threads <= 0) {
127  // Reasonable mid-point for concurrency when actual value unknown
128  threads = 8;
129  }
130  io_group = std::make_shared<folly::IOThreadPoolExecutor>(
131  threads, std::make_shared<folly::NamedThreadFactory>("IO Thread"));
132  }
133 
134  // TODO better config checking
135  // CHECK(acceptorFactory_ || childPipelineFactory_);
137 
138  if (acceptorFactory_) {
139  workerFactory_ = std::make_shared<ServerWorkerPool>(
140  acceptorFactory_, io_group.get(), sockets_, socketFactory_);
141  } else {
142  workerFactory_ = std::make_shared<ServerWorkerPool>(
143  std::make_shared<ServerAcceptorFactory<Pipeline>>(
145  io_group.get(),
146  sockets_,
148  }
149 
150  io_group->addObserver(workerFactory_);
151 
152  acceptor_group_ = accept_group;
153  io_group_ = io_group;
154 
155  return this;
156  }
157 
158  /*
159  * Bind to an existing socket
160  *
161  * @param sock Existing socket to use for accepting
162  */
164  if (!workerFactory_) {
165  group(nullptr);
166  }
167 
168  // Since only a single socket is given,
169  // we can only accept on a single thread
170  CHECK(acceptor_group_->numThreads() == 1);
171 
172  std::shared_ptr<folly::AsyncServerSocket> socket(
174  socket->setMaxNumMessagesInQueue(
176 
177  folly::via(acceptor_group_.get(), [&] {
178  socket->attachEventBase(folly::EventBaseManager::get()->getEventBase());
180  socket->startAccepting();
181  }).get();
182 
183  // Startup all the threads
184  workerFactory_->forEachWorker([this, socket](Acceptor* worker){
185  socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
186  [this, worker, socket](){
187  socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
188  });
189  });
190 
191  sockets_->push_back(socket);
192  }
193 
194  void bind(folly::SocketAddress& address) {
195  bindImpl(address);
196  }
197 
198  /*
199  * Bind to a port and start listening.
200  * One of childPipeline or childHandler must be called before bind
201  *
202  * @param port Port to listen on
203  */
204  void bind(int port) {
205  CHECK(port >= 0);
206  folly::SocketAddress address;
207  address.setFromLocalPort(port);
208  bindImpl(address);
209  }
210 
212  if (!workerFactory_) {
213  group(nullptr);
214  }
215 
216  bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);
217 
218  std::mutex sock_lock;
219  std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
220 
221 
222  std::exception_ptr exn;
223 
224  auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier) {
225 
226  try {
227  auto socket = socketFactory_->newSocket(
228  address, socketConfig.acceptBacklog, reusePort, socketConfig);
229  sock_lock.lock();
230  new_sockets.push_back(socket);
231  sock_lock.unlock();
232  socket->getAddress(&address);
233 
234  barrier->post();
235  } catch (...) {
236  exn = std::current_exception();
237  barrier->post();
238 
239  return;
240  }
241 
242  };
243 
244  auto wait0 = std::make_shared<folly::Baton<>>();
245  acceptor_group_->add(std::bind(startupFunc, wait0));
246  wait0->wait();
247 
248  for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
249  auto barrier = std::make_shared<folly::Baton<>>();
250  acceptor_group_->add(std::bind(startupFunc, barrier));
251  barrier->wait();
252  }
253 
254  if (exn) {
255  std::rethrow_exception(exn);
256  }
257 
258  for (auto& socket : new_sockets) {
259  // Startup all the threads
260  workerFactory_->forEachWorker([this, socket](Acceptor* worker){
261  socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
262  [this, worker, socket](){
263  socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
264  });
265  });
266 
267  sockets_->push_back(socket);
268  }
269  }
270 
271  /*
272  * Stop listening on all sockets.
273  */
274  void stop() {
275  // sockets_ may be null if ServerBootstrap has been std::move'd
276  if (sockets_) {
277  sockets_->clear();
278  }
279  if (!stopped_) {
280  stopped_ = true;
281  // stopBaton_ may be null if ServerBootstrap has been std::move'd
282  if (stopBaton_) {
283  stopBaton_->post();
284  }
285  }
286  }
287 
288  void join() {
289  if (acceptor_group_) {
290  acceptor_group_->join();
291  }
292  if (io_group_) {
293  io_group_->join();
294  }
295  }
296 
297  void waitForStop() {
298  if (!stopped_) {
299  CHECK(stopBaton_);
300  stopBaton_->wait();
301  }
302  }
303 
304  /*
305  * Get the list of listening sockets
306  */
307  const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
308  getSockets() const {
309  return *sockets_;
310  }
311 
312  std::shared_ptr<folly::IOThreadPoolExecutor> getIOGroup() const {
313  return io_group_;
314  }
315 
316  template <typename F>
317  void forEachWorker(F&& f) const {
318  if (!workerFactory_) {
319  return;
320  }
321  workerFactory_->forEachWorker(f);
322  }
323 
325 
326  ServerBootstrap* setReusePort(bool reusePort) {
327  reusePort_ = reusePort;
328  return this;
329  }
330 
331  private:
332  std::shared_ptr<folly::IOThreadPoolExecutor> acceptor_group_;
333  std::shared_ptr<folly::IOThreadPoolExecutor> io_group_;
334 
335  std::shared_ptr<ServerWorkerPool> workerFactory_;
336  std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_{
337  std::make_shared<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()};
338 
339  std::shared_ptr<AcceptorFactory> acceptorFactory_;
340  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
341  std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_{
342  std::make_shared<DefaultAcceptPipelineFactory>()};
343  std::shared_ptr<ServerSocketFactory> socketFactory_{
344  std::make_shared<AsyncServerSocketFactory>()};
345 
347 
348  bool reusePort_{false};
349 
350  std::unique_ptr<folly::Baton<>> stopBaton_{
351  std::make_unique<folly::Baton<>>()};
352  bool stopped_{false};
353 };
354 
355 } // namespace wangle
ServerBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > accept_group, std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
ServerBootstrap * channelFactory(std::shared_ptr< ServerSocketFactory > factory)
*than *hazptr_holder h
Definition: Hazptr.h:116
auto f
ServerBootstrap * setReusePort(bool reusePort)
void setFromLocalPort(uint16_t port)
void bind(folly::AsyncServerSocket::UniquePtr s)
ServerSocketConfig accConfig_
std::shared_ptr< ServerWorkerPool > workerFactory_
EventBase * getEventBase()
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
ServerSocketConfig socketConfig
void bind(folly::SocketAddress &address)
static EventBaseManager * get()
ServerBootstrap * childPipeline(std::shared_ptr< PipelineFactory< Pipeline >> factory)
std::shared_ptr< AcceptorFactory > acceptorFactory_
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
std::vector< std::thread::id > threads
std::unique_ptr< folly::Baton<> > stopBaton_
ServerBootstrap * pipeline(std::shared_ptr< AcceptPipelineFactory > factory)
ServerBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
void bindImpl(folly::SocketAddress &address)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
const std::vector< std::shared_ptr< folly::AsyncSocketBase > > & getSockets() const
void forEachWorker(F &&f) const
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
std::shared_ptr< folly::IOThreadPoolExecutor > io_group_
std::mutex mutex
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static set< string > s
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
Definition: Future-inl.h:1290
ServerBootstrap * childHandler(std::shared_ptr< AcceptorFactory > h)
std::shared_ptr< folly::IOThreadPoolExecutor > acceptor_group_
std::shared_ptr< ServerSocketFactory > socketFactory_
std::shared_ptr< folly::IOThreadPoolExecutor > getIOGroup() const
ServerBootstrap * acceptorConfig(const ServerSocketConfig &accConfig)
virtual folly::EventBase * getEventBase() const
Definition: Acceptor.h:127