proxygen
wangle::ServerBootstrap< Pipeline > Class Template Reference

#include <ServerBootstrap.h>

Public Member Functions

 ServerBootstrap (const ServerBootstrap &that)=delete
 
 ServerBootstrap (ServerBootstrap &&that)=default
 
 ServerBootstrap ()=default
 
 ~ServerBootstrap ()
 
ServerBootstrappipeline (std::shared_ptr< AcceptPipelineFactory > factory)
 
ServerBootstrapchannelFactory (std::shared_ptr< ServerSocketFactory > factory)
 
ServerBootstrapacceptorConfig (const ServerSocketConfig &accConfig)
 
ServerBootstrapchildHandler (std::shared_ptr< AcceptorFactory > h)
 
ServerBootstrapchildPipeline (std::shared_ptr< PipelineFactory< Pipeline >> factory)
 
ServerBootstrapgroup (std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
 
ServerBootstrapgroup (std::shared_ptr< folly::IOThreadPoolExecutor > accept_group, std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
 
void bind (folly::AsyncServerSocket::UniquePtr s)
 
void bind (folly::SocketAddress &address)
 
void bind (int port)
 
void bindImpl (folly::SocketAddress &address)
 
void stop ()
 
void join ()
 
void waitForStop ()
 
const std::vector< std::shared_ptr< folly::AsyncSocketBase > > & getSockets () const
 
std::shared_ptr< folly::IOThreadPoolExecutorgetIOGroup () const
 
template<typename F >
void forEachWorker (F &&f) const
 
ServerBootstrapsetReusePort (bool reusePort)
 

Public Attributes

ServerSocketConfig socketConfig
 

Private Attributes

std::shared_ptr< folly::IOThreadPoolExecutoracceptor_group_
 
std::shared_ptr< folly::IOThreadPoolExecutorio_group_
 
std::shared_ptr< ServerWorkerPoolworkerFactory_
 
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
 
std::shared_ptr< AcceptorFactoryacceptorFactory_
 
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
 
std::shared_ptr< AcceptPipelineFactoryacceptPipelineFactory_
 
std::shared_ptr< ServerSocketFactorysocketFactory_
 
ServerSocketConfig accConfig_
 
bool reusePort_ {false}
 
std::unique_ptr< folly::Baton<> > stopBaton_
 
bool stopped_ {false}
 

Detailed Description

template<typename Pipeline = wangle::DefaultPipeline>
class wangle::ServerBootstrap< Pipeline >

Definition at line 42 of file ServerBootstrap.h.

Constructor & Destructor Documentation

template<typename Pipeline = wangle::DefaultPipeline>
wangle::ServerBootstrap< Pipeline >::ServerBootstrap ( const ServerBootstrap< Pipeline > &  that)
delete
template<typename Pipeline = wangle::DefaultPipeline>
wangle::ServerBootstrap< Pipeline >::ServerBootstrap ( ServerBootstrap< Pipeline > &&  that)
default
template<typename Pipeline = wangle::DefaultPipeline>
wangle::ServerBootstrap< Pipeline >::ServerBootstrap ( )
default
template<typename Pipeline = wangle::DefaultPipeline>
wangle::ServerBootstrap< Pipeline >::~ServerBootstrap ( )
inline

Definition at line 49 of file ServerBootstrap.h.

49  {
50  stop();
51  join();
52  }

Member Function Documentation

template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::acceptorConfig ( const ServerSocketConfig accConfig)
inline

Definition at line 70 of file ServerBootstrap.h.

Referenced by main().

70  {
71  accConfig_ = accConfig;
72  return this;
73  }
ServerSocketConfig accConfig_
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::bind ( folly::AsyncServerSocket::UniquePtr  s)
inline

Definition at line 163 of file ServerBootstrap.h.

Referenced by main(), and wangle::TEST().

163  {
164  if (!workerFactory_) {
165  group(nullptr);
166  }
167 
168  // Since only a single socket is given,
169  // we can only accept on a single thread
170  CHECK(acceptor_group_->numThreads() == 1);
171 
172  std::shared_ptr<folly::AsyncServerSocket> socket(
173  s.release(), AsyncServerSocketFactory::ThreadSafeDestructor());
174  socket->setMaxNumMessagesInQueue(
176 
177  folly::via(acceptor_group_.get(), [&] {
178  socket->attachEventBase(folly::EventBaseManager::get()->getEventBase());
180  socket->startAccepting();
181  }).get();
182 
183  // Startup all the threads
184  workerFactory_->forEachWorker([this, socket](Acceptor* worker){
185  socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
186  [this, worker, socket](){
187  socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
188  });
189  });
190 
191  sockets_->push_back(socket);
192  }
ServerSocketConfig accConfig_
std::shared_ptr< ServerWorkerPool > workerFactory_
EventBase * getEventBase()
ServerSocketConfig socketConfig
static EventBaseManager * get()
ServerBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
static set< string > s
auto via(Executor *x, Func &&func) -> Future< typename isFutureOrSemiFuture< decltype(std::declval< Func >()())>::Inner >
Definition: Future-inl.h:1290
std::shared_ptr< folly::IOThreadPoolExecutor > acceptor_group_
std::shared_ptr< ServerSocketFactory > socketFactory_
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::bind ( folly::SocketAddress address)
inline

Definition at line 194 of file ServerBootstrap.h.

194  {
195  bindImpl(address);
196  }
void bindImpl(folly::SocketAddress &address)
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::bind ( int  port)
inline

Definition at line 204 of file ServerBootstrap.h.

204  {
205  CHECK(port >= 0);
206  folly::SocketAddress address;
207  address.setFromLocalPort(port);
208  bindImpl(address);
209  }
void setFromLocalPort(uint16_t port)
void bindImpl(folly::SocketAddress &address)
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::bindImpl ( folly::SocketAddress address)
inline

Definition at line 211 of file ServerBootstrap.h.

Referenced by wangle::ServerBootstrap< DefaultPipeline >::bind().

211  {
212  if (!workerFactory_) {
213  group(nullptr);
214  }
215 
216  bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);
217 
218  std::mutex sock_lock;
219  std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
220 
221 
222  std::exception_ptr exn;
223 
224  auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier) {
225 
226  try {
227  auto socket = socketFactory_->newSocket(
228  address, socketConfig.acceptBacklog, reusePort, socketConfig);
229  sock_lock.lock();
230  new_sockets.push_back(socket);
231  sock_lock.unlock();
232  socket->getAddress(&address);
233 
234  barrier->post();
235  } catch (...) {
236  exn = std::current_exception();
237  barrier->post();
238 
239  return;
240  }
241 
242  };
243 
244  auto wait0 = std::make_shared<folly::Baton<>>();
245  acceptor_group_->add(std::bind(startupFunc, wait0));
246  wait0->wait();
247 
248  for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
249  auto barrier = std::make_shared<folly::Baton<>>();
250  acceptor_group_->add(std::bind(startupFunc, barrier));
251  barrier->wait();
252  }
253 
254  if (exn) {
256  }
257 
258  for (auto& socket : new_sockets) {
259  // Startup all the threads
260  workerFactory_->forEachWorker([this, socket](Acceptor* worker){
261  socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
262  [this, worker, socket](){
263  socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
264  });
265  });
266 
267  sockets_->push_back(socket);
268  }
269  }
void rethrow_exception(std::exception_ptr ep)
std::shared_ptr< ServerWorkerPool > workerFactory_
ServerSocketConfig socketConfig
ServerBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
std::mutex mutex
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
std::shared_ptr< folly::IOThreadPoolExecutor > acceptor_group_
std::shared_ptr< ServerSocketFactory > socketFactory_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::channelFactory ( std::shared_ptr< ServerSocketFactory factory)
inline

Definition at line 64 of file ServerBootstrap.h.

65  {
66  socketFactory_ = factory;
67  return this;
68  }
std::shared_ptr< ServerSocketFactory > socketFactory_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::childHandler ( std::shared_ptr< AcceptorFactory h)
inline

Definition at line 81 of file ServerBootstrap.h.

81  {
83  return this;
84  }
*than *hazptr_holder h
Definition: Hazptr.h:116
std::shared_ptr< AcceptorFactory > acceptorFactory_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::childPipeline ( std::shared_ptr< PipelineFactory< Pipeline >>  factory)
inline

Definition at line 91 of file ServerBootstrap.h.

Referenced by main(), and wangle::TEST().

92  {
93  childPipelineFactory_ = factory;
94  return this;
95  }
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
template<typename Pipeline = wangle::DefaultPipeline>
template<typename F >
void wangle::ServerBootstrap< Pipeline >::forEachWorker ( F &&  f) const
inline

Definition at line 317 of file ServerBootstrap.h.

317  {
318  if (!workerFactory_) {
319  return;
320  }
321  workerFactory_->forEachWorker(f);
322  }
auto f
std::shared_ptr< ServerWorkerPool > workerFactory_
template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<folly::IOThreadPoolExecutor> wangle::ServerBootstrap< Pipeline >::getIOGroup ( ) const
inline

Definition at line 312 of file ServerBootstrap.h.

312  {
313  return io_group_;
314  }
std::shared_ptr< folly::IOThreadPoolExecutor > io_group_
template<typename Pipeline = wangle::DefaultPipeline>
const std::vector<std::shared_ptr<folly::AsyncSocketBase> >& wangle::ServerBootstrap< Pipeline >::getSockets ( ) const
inline

Definition at line 308 of file ServerBootstrap.h.

Referenced by wangle::TEST().

308  {
309  return *sockets_;
310  }
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::group ( std::shared_ptr< folly::IOThreadPoolExecutor io_group)
inline

Definition at line 103 of file ServerBootstrap.h.

Referenced by wangle::ServerBootstrap< DefaultPipeline >::bind(), wangle::ServerBootstrap< DefaultPipeline >::bindImpl(), wangle::ServerBootstrap< DefaultPipeline >::group(), and main().

104  {
105  return group(nullptr, io_group);
106  }
ServerBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > io_group)
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::group ( std::shared_ptr< folly::IOThreadPoolExecutor accept_group,
std::shared_ptr< folly::IOThreadPoolExecutor io_group 
)
inline

Definition at line 117 of file ServerBootstrap.h.

119  {
120  if (!accept_group) {
121  accept_group = std::make_shared<folly::IOThreadPoolExecutor>(
122  1, std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
123  }
124  if (!io_group) {
125  auto threads = std::thread::hardware_concurrency();
126  if (threads <= 0) {
127  // Reasonable mid-point for concurrency when actual value unknown
128  threads = 8;
129  }
130  io_group = std::make_shared<folly::IOThreadPoolExecutor>(
131  threads, std::make_shared<folly::NamedThreadFactory>("IO Thread"));
132  }
133 
134  // TODO better config checking
135  // CHECK(acceptorFactory_ || childPipelineFactory_);
137 
138  if (acceptorFactory_) {
139  workerFactory_ = std::make_shared<ServerWorkerPool>(
140  acceptorFactory_, io_group.get(), sockets_, socketFactory_);
141  } else {
142  workerFactory_ = std::make_shared<ServerWorkerPool>(
143  std::make_shared<ServerAcceptorFactory<Pipeline>>(
145  io_group.get(),
146  sockets_,
148  }
149 
150  io_group->addObserver(workerFactory_);
151 
152  acceptor_group_ = accept_group;
153  io_group_ = io_group;
154 
155  return this;
156  }
ServerSocketConfig accConfig_
std::shared_ptr< ServerWorkerPool > workerFactory_
std::shared_ptr< PipelineFactory< Pipeline > > childPipelineFactory_
std::shared_ptr< AcceptorFactory > acceptorFactory_
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
std::vector< std::thread::id > threads
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
std::shared_ptr< folly::IOThreadPoolExecutor > io_group_
std::shared_ptr< folly::IOThreadPoolExecutor > acceptor_group_
std::shared_ptr< ServerSocketFactory > socketFactory_
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::join ( )
inline

Definition at line 288 of file ServerBootstrap.h.

Referenced by wangle::ServerBootstrap< DefaultPipeline >::~ServerBootstrap().

288  {
289  if (acceptor_group_) {
290  acceptor_group_->join();
291  }
292  if (io_group_) {
293  io_group_->join();
294  }
295  }
std::shared_ptr< folly::IOThreadPoolExecutor > io_group_
std::shared_ptr< folly::IOThreadPoolExecutor > acceptor_group_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::pipeline ( std::shared_ptr< AcceptPipelineFactory factory)
inline

Definition at line 59 of file ServerBootstrap.h.

Referenced by main().

59  {
60  acceptPipelineFactory_ = factory;
61  return this;
62  }
std::shared_ptr< AcceptPipelineFactory > acceptPipelineFactory_
template<typename Pipeline = wangle::DefaultPipeline>
ServerBootstrap* wangle::ServerBootstrap< Pipeline >::setReusePort ( bool  reusePort)
inline

Definition at line 326 of file ServerBootstrap.h.

326  {
327  reusePort_ = reusePort;
328  return this;
329  }
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::stop ( )
inline

Definition at line 274 of file ServerBootstrap.h.

Referenced by wangle::TEST(), and wangle::ServerBootstrap< DefaultPipeline >::~ServerBootstrap().

274  {
275  // sockets_ may be null if ServerBootstrap has been std::move'd
276  if (sockets_) {
277  sockets_->clear();
278  }
279  if (!stopped_) {
280  stopped_ = true;
281  // stopBaton_ may be null if ServerBootstrap has been std::move'd
282  if (stopBaton_) {
283  stopBaton_->post();
284  }
285  }
286  }
std::unique_ptr< folly::Baton<> > stopBaton_
std::shared_ptr< std::vector< std::shared_ptr< folly::AsyncSocketBase > > > sockets_
template<typename Pipeline = wangle::DefaultPipeline>
void wangle::ServerBootstrap< Pipeline >::waitForStop ( )
inline

Definition at line 297 of file ServerBootstrap.h.

Referenced by main().

297  {
298  if (!stopped_) {
299  CHECK(stopBaton_);
300  stopBaton_->wait();
301  }
302  }
std::unique_ptr< folly::Baton<> > stopBaton_

Member Data Documentation

template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<AcceptorFactory> wangle::ServerBootstrap< Pipeline >::acceptorFactory_
private
template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<AcceptPipelineFactory> wangle::ServerBootstrap< Pipeline >::acceptPipelineFactory_
private
Initial value:
{
std::make_shared<DefaultAcceptPipelineFactory>()}

Definition at line 341 of file ServerBootstrap.h.

Referenced by wangle::ServerBootstrap< DefaultPipeline >::group(), and wangle::ServerBootstrap< DefaultPipeline >::pipeline().

template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<PipelineFactory<Pipeline> > wangle::ServerBootstrap< Pipeline >::childPipelineFactory_
private
template<typename Pipeline = wangle::DefaultPipeline>
bool wangle::ServerBootstrap< Pipeline >::reusePort_ {false}
private
template<typename Pipeline = wangle::DefaultPipeline>
ServerSocketConfig wangle::ServerBootstrap< Pipeline >::socketConfig
template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<ServerSocketFactory> wangle::ServerBootstrap< Pipeline >::socketFactory_
private
template<typename Pipeline = wangle::DefaultPipeline>
std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase> > > wangle::ServerBootstrap< Pipeline >::sockets_
private
template<typename Pipeline = wangle::DefaultPipeline>
std::unique_ptr<folly::Baton<> > wangle::ServerBootstrap< Pipeline >::stopBaton_
private
Initial value:
{
std::make_unique<folly::Baton<>>()}

Definition at line 350 of file ServerBootstrap.h.

Referenced by wangle::ServerBootstrap< DefaultPipeline >::stop(), and wangle::ServerBootstrap< DefaultPipeline >::waitForStop().

template<typename Pipeline = wangle::DefaultPipeline>
bool wangle::ServerBootstrap< Pipeline >::stopped_ {false}
private

The documentation for this class was generated from the following file: