20 template <
typename Pipeline,
typename R>
24 if (conn.type() !=
typeid(
ConnInfo&)) {
30 const auto& connInfo = boost::get<ConnInfo&>(conn);
31 auto socket = std::shared_ptr<folly::AsyncTransportWrapper>(
38 auto routingPipeline = newRoutingPipeline();
40 routingPipeline->addBack(routingHandlerFactory_->newHandler(connId,
this));
41 routingPipeline->finalize();
44 auto transportInfo = std::make_shared<TransportInfo>(connInfo.tinfo);
47 socket->getLocalAddress(&localAddr);
48 socket->getPeerAddress(&peerAddr);
50 VLOG(2) <<
"Socket is no longer valid.";
53 transportInfo->localAddr = std::make_shared<folly::SocketAddress>(
localAddr);
54 transportInfo->remoteAddr = std::make_shared<folly::SocketAddress>(
peerAddr);
55 routingPipeline->setTransportInfo(transportInfo);
57 routingPipeline->transportActive();
58 routingPipelines_[connId] =
std::move(routingPipeline);
61 template <
typename Pipeline,
typename R>
66 template <
typename Pipeline,
typename R>
73 template <
typename Pipeline,
typename R>
78 auto routingPipelineIter = routingPipelines_.find(connId);
79 if (routingPipelineIter == routingPipelines_.end()) {
80 VLOG(2) <<
"Connection has already been closed, " 81 "or routed to a worker thread.";
84 auto routingPipeline =
std::move(routingPipelineIter->second);
85 routingPipelines_.erase(routingPipelineIter);
90 routingPipeline->getTransport());
91 routingPipeline->transportInactive();
96 auto acceptor = acceptors_[hash % acceptors_.size()];
99 acceptor->getEventBase()->runInEventBaseThread(
100 [ =, routingData =
std::move(routingData) ]()
mutable {
101 socket->attachEventBase(acceptor->getEventBase());
103 auto routingHandler =
104 routingPipeline->template getHandler<RoutingDataHandler<R>>();
105 DCHECK(routingHandler);
106 auto transportInfo = routingPipeline->getTransportInfo();
107 auto pipeline = childPipelineFactory_->newPipeline(
112 acceptor->addConnection(connection);
114 pipeline->transportActive();
117 pipeline->read(routingData.
bufQueue);
121 template <
typename Pipeline,
typename R>
125 VLOG(4) <<
"Exception while parsing routing data: " << ex.
what();
128 auto ctx = getContext();
130 CHECK_NOTNULL(dynamic_cast<AcceptPipeline*>(ctx->getPipeline()));
131 pipeline->readException(ex);
134 routingPipelines_.erase(connId);
137 template <
typename Pipeline,
typename R>
139 if (!acceptors_.empty()) {
143 server_->forEachWorker(
144 [&](
Acceptor* acceptor) { acceptors_.push_back(acceptor); });
folly::fbstring what() const
const SocketAddress peerAddr
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
constexpr detail::Map< Move > move
folly::IOBufQueue bufQueue
void readException(Context *ctx, folly::exception_wrapper ex) override
void readEOF(Context *ctx) override
void onError(uint64_t connId, folly::exception_wrapper ex) override
NetworkSocket socket(int af, int type, int protocol)
void read(Context *ctx, AcceptPipelineType conn) override
const SocketAddress localAddr
void onRoutingData(uint64_t connId, typename RoutingDataHandler< R >::RoutingData &routingData) override