proxygen
wangle::AcceptRoutingHandler< Pipeline, R > Class Template Reference

#include <AcceptRoutingHandler.h>

Inheritance diagram for wangle::AcceptRoutingHandler< Pipeline, R >:
wangle::InboundHandler< AcceptPipelineType > wangle::RoutingDataHandler< R >::Callback wangle::HandlerBase< InboundHandlerContext< AcceptPipelineType > >

Public Member Functions

 AcceptRoutingHandler (ServerBootstrap< Pipeline > *server, std::shared_ptr< RoutingDataHandlerFactory< R >> routingHandlerFactory, std::shared_ptr< RoutingDataPipelineFactory< Pipeline, R >> childPipelineFactory)
 
void read (Context *ctx, AcceptPipelineType conn) override
 
void readEOF (Context *ctx) override
 
void readException (Context *ctx, folly::exception_wrapper ex) override
 
void onRoutingData (uint64_t connId, typename RoutingDataHandler< R >::RoutingData &routingData) override
 
void onError (uint64_t connId, folly::exception_wrapper ex) override
 
size_t getRoutingPipelineCount () const
 
- Public Member Functions inherited from wangle::InboundHandler< AcceptPipelineType >
 ~InboundHandler () override=default
 
virtual void transportActive (Context *ctx)
 
virtual void transportInactive (Context *ctx)
 
- Public Member Functions inherited from wangle::HandlerBase< InboundHandlerContext< AcceptPipelineType > >
virtual ~HandlerBase ()=default
 
virtual void attachPipeline (InboundHandlerContext< AcceptPipelineType > *)
 
virtual void detachPipeline (InboundHandlerContext< AcceptPipelineType > *)
 
InboundHandlerContext< AcceptPipelineType > * getContext ()
 
- Public Member Functions inherited from wangle::RoutingDataHandler< R >::Callback
virtual ~Callback ()
 
virtual void onRoutingData (uint64_t connId, RoutingData &routingData)=0
 

Private Member Functions

void populateAcceptors ()
 
virtual DefaultPipeline::Ptr newRoutingPipeline ()
 

Private Attributes

ServerBootstrap< Pipeline > * server_
 
std::shared_ptr< RoutingDataHandlerFactory< R > > routingHandlerFactory_
 
std::shared_ptr< RoutingDataPipelineFactory< Pipeline, R > > childPipelineFactory_
 
std::vector< Acceptor * > acceptors_
 
std::map< uint64_t, DefaultPipeline::PtrroutingPipelines_
 
uint64_t nextConnId_ {0}
 

Additional Inherited Members

- Public Types inherited from wangle::InboundHandler< AcceptPipelineType >
typedef AcceptPipelineType rin
 
typedef AcceptPipelineType rout
 
typedef folly::Unit win
 
typedef folly::Unit wout
 
typedef InboundHandlerContext< AcceptPipelineTypeContext
 
- Static Public Attributes inherited from wangle::InboundHandler< AcceptPipelineType >
static const HandlerDir dir
 

Detailed Description

template<typename Pipeline, typename R>
class wangle::AcceptRoutingHandler< Pipeline, R >

Definition at line 42 of file AcceptRoutingHandler.h.

Constructor & Destructor Documentation

template<typename Pipeline, typename R>
wangle::AcceptRoutingHandler< Pipeline, R >::AcceptRoutingHandler ( ServerBootstrap< Pipeline > *  server,
std::shared_ptr< RoutingDataHandlerFactory< R >>  routingHandlerFactory,
std::shared_ptr< RoutingDataPipelineFactory< Pipeline, R >>  childPipelineFactory 
)
inline

Definition at line 45 of file AcceptRoutingHandler.h.

50  : server_(CHECK_NOTNULL(server)),
51  routingHandlerFactory_(routingHandlerFactory),
52  childPipelineFactory_(childPipelineFactory) {}
ServerBootstrap< Pipeline > * server_
std::shared_ptr< RoutingDataHandlerFactory< R > > routingHandlerFactory_
std::shared_ptr< RoutingDataPipelineFactory< Pipeline, R > > childPipelineFactory_

Member Function Documentation

template<typename Pipeline, typename R>
size_t wangle::AcceptRoutingHandler< Pipeline, R >::getRoutingPipelineCount ( ) const
inline

Definition at line 65 of file AcceptRoutingHandler.h.

65  {
66  return routingPipelines_.size();
67  }
std::map< uint64_t, DefaultPipeline::Ptr > routingPipelines_
template<typename Pipeline, typename R>
virtual DefaultPipeline::Ptr wangle::AcceptRoutingHandler< Pipeline, R >::newRoutingPipeline ( )
inlineprivatevirtual

Reimplemented in wangle::MockAcceptRoutingHandler.

Definition at line 71 of file AcceptRoutingHandler.h.

71  {
72  return DefaultPipeline::create();
73  }
static Ptr create()
Definition: Pipeline.h:174
template<typename Pipeline , typename R >
void wangle::AcceptRoutingHandler< Pipeline, R >::onError ( uint64_t  connId,
folly::exception_wrapper  ex 
)
overridevirtual

Implements wangle::RoutingDataHandler< R >::Callback.

Definition at line 122 of file AcceptRoutingHandler-inl.h.

References folly::exception_wrapper::what().

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::AcceptRoutingHandler().

124  {
125  VLOG(4) << "Exception while parsing routing data: " << ex.what();
126 
127  // Notify all handlers of the exception
128  auto ctx = getContext();
129  auto pipeline =
130  CHECK_NOTNULL(dynamic_cast<AcceptPipeline*>(ctx->getPipeline()));
131  pipeline->readException(ex);
132 
133  // Delete the routing pipeline. This will close and delete the socket as well.
134  routingPipelines_.erase(connId);
135 }
folly::fbstring what() const
InboundHandlerContext< AcceptPipelineType > * getContext()
Definition: Handler.h:34
std::map< uint64_t, DefaultPipeline::Ptr > routingPipelines_
template<typename Pipeline , typename R>
void wangle::AcceptRoutingHandler< Pipeline, R >::onRoutingData ( uint64_t  connId,
typename RoutingDataHandler< R >::RoutingData &  routingData 
)
override

Definition at line 74 of file AcceptRoutingHandler-inl.h.

References wangle::RoutingDataHandler< R >::RoutingData::bufQueue, folly::gen::move, wangle::RoutingDataHandler< R >::RoutingData::routingData, folly::netops::socket(), and uint64_t.

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::AcceptRoutingHandler().

76  {
77  // Get the routing pipeline corresponding to this connection
78  auto routingPipelineIter = routingPipelines_.find(connId);
79  if (routingPipelineIter == routingPipelines_.end()) {
80  VLOG(2) << "Connection has already been closed, "
81  "or routed to a worker thread.";
82  return;
83  }
84  auto routingPipeline = std::move(routingPipelineIter->second);
85  routingPipelines_.erase(routingPipelineIter);
86 
87  // Fetch the socket from the pipeline and pause reading from the
88  // socket
89  auto socket = std::dynamic_pointer_cast<folly::AsyncSocket>(
90  routingPipeline->getTransport());
91  routingPipeline->transportInactive();
92  socket->detachEventBase();
93 
94  // Hash based on routing data to pick a new acceptor
95  uint64_t hash = std::hash<R>()(routingData.routingData);
96  auto acceptor = acceptors_[hash % acceptors_.size()];
97 
98  // Switch to the new acceptor's thread
99  acceptor->getEventBase()->runInEventBaseThread(
100  [ =, routingData = std::move(routingData) ]() mutable {
101  socket->attachEventBase(acceptor->getEventBase());
102 
103  auto routingHandler =
104  routingPipeline->template getHandler<RoutingDataHandler<R>>();
105  DCHECK(routingHandler);
106  auto transportInfo = routingPipeline->getTransportInfo();
107  auto pipeline = childPipelineFactory_->newPipeline(
108  socket, routingData.routingData, routingHandler, transportInfo);
109 
110  auto connection =
111  new typename ServerAcceptor<Pipeline>::ServerConnection(pipeline);
112  acceptor->addConnection(connection);
113 
114  pipeline->transportActive();
115 
116  // Pass in the buffered bytes to the pipeline
117  pipeline->read(routingData.bufQueue);
118  });
119 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::map< uint64_t, DefaultPipeline::Ptr > routingPipelines_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< RoutingDataPipelineFactory< Pipeline, R > > childPipelineFactory_
std::vector< Acceptor * > acceptors_
template<typename Pipeline , typename R >
void wangle::AcceptRoutingHandler< Pipeline, R >::populateAcceptors ( )
private

Definition at line 138 of file AcceptRoutingHandler-inl.h.

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::getRoutingPipelineCount().

138  {
139  if (!acceptors_.empty()) {
140  return;
141  }
142  CHECK(server_);
143  server_->forEachWorker(
144  [&](Acceptor* acceptor) { acceptors_.push_back(acceptor); });
145 }
ServerBootstrap< Pipeline > * server_
std::vector< Acceptor * > acceptors_
template<typename Pipeline , typename R >
void wangle::AcceptRoutingHandler< Pipeline, R >::read ( Context ctx,
AcceptPipelineType  conn 
)
overridevirtual

Implements wangle::InboundHandler< AcceptPipelineType >.

Definition at line 21 of file AcceptRoutingHandler-inl.h.

References proxygen::localAddr, folly::gen::move, proxygen::peerAddr, folly::netops::socket(), and uint64_t.

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::AcceptRoutingHandler().

23  {
24  if (conn.type() != typeid(ConnInfo&)) {
25  return;
26  }
27 
29 
30  const auto& connInfo = boost::get<ConnInfo&>(conn);
31  auto socket = std::shared_ptr<folly::AsyncTransportWrapper>(
32  connInfo.sock, folly::DelayedDestruction::Destructor());
33 
34  uint64_t connId = nextConnId_++;
35 
36  // Create a new routing pipeline for this connection to read from
37  // the socket until it parses the routing data
38  auto routingPipeline = newRoutingPipeline();
39  routingPipeline->addBack(wangle::AsyncSocketHandler(socket));
40  routingPipeline->addBack(routingHandlerFactory_->newHandler(connId, this));
41  routingPipeline->finalize();
42 
43  // Initialize TransportInfo and set it on the routing pipeline
44  auto transportInfo = std::make_shared<TransportInfo>(connInfo.tinfo);
46  try {
47  socket->getLocalAddress(&localAddr);
48  socket->getPeerAddress(&peerAddr);
49  } catch (...) {
50  VLOG(2) << "Socket is no longer valid.";
51  return;
52  }
53  transportInfo->localAddr = std::make_shared<folly::SocketAddress>(localAddr);
54  transportInfo->remoteAddr = std::make_shared<folly::SocketAddress>(peerAddr);
55  routingPipeline->setTransportInfo(transportInfo);
56 
57  routingPipeline->transportActive();
58  routingPipelines_[connId] = std::move(routingPipeline);
59 }
const SocketAddress peerAddr
Definition: TestUtils.cpp:20
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
TotalRequest HTTPRequestExchange HTTPResponseBodyRead PreConnect PostConnect DNSResolution DNSCache RetryingDNSResolution TCPConnect TLSSetup TotalConnect decompression_filter cert_verification proxy_connect push scheduling network_change multi_connector single_connector SessionTransactions TCPInfo ConnInfo
std::map< uint64_t, DefaultPipeline::Ptr > routingPipelines_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< RoutingDataHandlerFactory< R > > routingHandlerFactory_
const SocketAddress localAddr
Definition: TestUtils.cpp:19
virtual DefaultPipeline::Ptr newRoutingPipeline()
template<typename Pipeline , typename R >
void wangle::AcceptRoutingHandler< Pipeline, R >::readEOF ( Context ctx)
overridevirtual

Reimplemented from wangle::InboundHandler< AcceptPipelineType >.

Definition at line 62 of file AcceptRoutingHandler-inl.h.

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::AcceptRoutingHandler().

62  {
63  // Null implementation to terminate the call in this handler
64 }
template<typename Pipeline , typename R >
void wangle::AcceptRoutingHandler< Pipeline, R >::readException ( Context ctx,
folly::exception_wrapper  ex 
)
overridevirtual

Reimplemented from wangle::InboundHandler< AcceptPipelineType >.

Definition at line 67 of file AcceptRoutingHandler-inl.h.

Referenced by wangle::AcceptRoutingHandler< DefaultPipeline, char >::AcceptRoutingHandler().

69  {
70  // Null implementation to terminate the call in this handler
71 }

Member Data Documentation

template<typename Pipeline, typename R>
std::vector<Acceptor*> wangle::AcceptRoutingHandler< Pipeline, R >::acceptors_
private

Definition at line 80 of file AcceptRoutingHandler.h.

template<typename Pipeline, typename R>
std::shared_ptr<RoutingDataPipelineFactory<Pipeline, R> > wangle::AcceptRoutingHandler< Pipeline, R >::childPipelineFactory_
private
template<typename Pipeline, typename R>
uint64_t wangle::AcceptRoutingHandler< Pipeline, R >::nextConnId_ {0}
private

Definition at line 82 of file AcceptRoutingHandler.h.

template<typename Pipeline, typename R>
std::shared_ptr<RoutingDataHandlerFactory<R> > wangle::AcceptRoutingHandler< Pipeline, R >::routingHandlerFactory_
private
template<typename Pipeline, typename R>
std::map<uint64_t, DefaultPipeline::Ptr> wangle::AcceptRoutingHandler< Pipeline, R >::routingPipelines_
private
template<typename Pipeline, typename R>
ServerBootstrap<Pipeline>* wangle::AcceptRoutingHandler< Pipeline, R >::server_
private

The documentation for this class was generated from the following files: