proxygen
AcceptRoutingHandler-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 namespace wangle {
19 
20 template <typename Pipeline, typename R>
22  Context*,
23  AcceptPipelineType conn) {
24  if (conn.type() != typeid(ConnInfo&)) {
25  return;
26  }
27 
28  populateAcceptors();
29 
30  const auto& connInfo = boost::get<ConnInfo&>(conn);
31  auto socket = std::shared_ptr<folly::AsyncTransportWrapper>(
32  connInfo.sock, folly::DelayedDestruction::Destructor());
33 
34  uint64_t connId = nextConnId_++;
35 
36  // Create a new routing pipeline for this connection to read from
37  // the socket until it parses the routing data
38  auto routingPipeline = newRoutingPipeline();
39  routingPipeline->addBack(wangle::AsyncSocketHandler(socket));
40  routingPipeline->addBack(routingHandlerFactory_->newHandler(connId, this));
41  routingPipeline->finalize();
42 
43  // Initialize TransportInfo and set it on the routing pipeline
44  auto transportInfo = std::make_shared<TransportInfo>(connInfo.tinfo);
46  try {
47  socket->getLocalAddress(&localAddr);
48  socket->getPeerAddress(&peerAddr);
49  } catch (...) {
50  VLOG(2) << "Socket is no longer valid.";
51  return;
52  }
53  transportInfo->localAddr = std::make_shared<folly::SocketAddress>(localAddr);
54  transportInfo->remoteAddr = std::make_shared<folly::SocketAddress>(peerAddr);
55  routingPipeline->setTransportInfo(transportInfo);
56 
57  routingPipeline->transportActive();
58  routingPipelines_[connId] = std::move(routingPipeline);
59 }
60 
61 template <typename Pipeline, typename R>
63  // Null implementation to terminate the call in this handler
64 }
65 
66 template <typename Pipeline, typename R>
68  Context*,
70  // Null implementation to terminate the call in this handler
71 }
72 
73 template <typename Pipeline, typename R>
75  uint64_t connId,
76  typename RoutingDataHandler<R>::RoutingData& routingData) {
77  // Get the routing pipeline corresponding to this connection
78  auto routingPipelineIter = routingPipelines_.find(connId);
79  if (routingPipelineIter == routingPipelines_.end()) {
80  VLOG(2) << "Connection has already been closed, "
81  "or routed to a worker thread.";
82  return;
83  }
84  auto routingPipeline = std::move(routingPipelineIter->second);
85  routingPipelines_.erase(routingPipelineIter);
86 
87  // Fetch the socket from the pipeline and pause reading from the
88  // socket
89  auto socket = std::dynamic_pointer_cast<folly::AsyncSocket>(
90  routingPipeline->getTransport());
91  routingPipeline->transportInactive();
92  socket->detachEventBase();
93 
94  // Hash based on routing data to pick a new acceptor
95  uint64_t hash = std::hash<R>()(routingData.routingData);
96  auto acceptor = acceptors_[hash % acceptors_.size()];
97 
98  // Switch to the new acceptor's thread
99  acceptor->getEventBase()->runInEventBaseThread(
100  [ =, routingData = std::move(routingData) ]() mutable {
101  socket->attachEventBase(acceptor->getEventBase());
102 
103  auto routingHandler =
104  routingPipeline->template getHandler<RoutingDataHandler<R>>();
105  DCHECK(routingHandler);
106  auto transportInfo = routingPipeline->getTransportInfo();
107  auto pipeline = childPipelineFactory_->newPipeline(
108  socket, routingData.routingData, routingHandler, transportInfo);
109 
110  auto connection =
111  new typename ServerAcceptor<Pipeline>::ServerConnection(pipeline);
112  acceptor->addConnection(connection);
113 
114  pipeline->transportActive();
115 
116  // Pass in the buffered bytes to the pipeline
117  pipeline->read(routingData.bufQueue);
118  });
119 }
120 
121 template <typename Pipeline, typename R>
123  uint64_t connId,
125  VLOG(4) << "Exception while parsing routing data: " << ex.what();
126 
127  // Notify all handlers of the exception
128  auto ctx = getContext();
129  auto pipeline =
130  CHECK_NOTNULL(dynamic_cast<AcceptPipeline*>(ctx->getPipeline()));
131  pipeline->readException(ex);
132 
133  // Delete the routing pipeline. This will close and delete the socket as well.
134  routingPipelines_.erase(connId);
135 }
136 
137 template <typename Pipeline, typename R>
139  if (!acceptors_.empty()) {
140  return;
141  }
142  CHECK(server_);
143  server_->forEachWorker(
144  [&](Acceptor* acceptor) { acceptors_.push_back(acceptor); });
145 }
146 
147 } // namespace wangle
folly::fbstring what() const
const SocketAddress peerAddr
Definition: TestUtils.cpp:20
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
Definition: Pipeline.h:277
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void readException(Context *ctx, folly::exception_wrapper ex) override
void readEOF(Context *ctx) override
void onError(uint64_t connId, folly::exception_wrapper ex) override
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
void read(Context *ctx, AcceptPipelineType conn) override
const SocketAddress localAddr
Definition: TestUtils.cpp:19
void onRoutingData(uint64_t connId, typename RoutingDataHandler< R >::RoutingData &routingData) override