proxygen
AcceptRoutingHandlerTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "Mocks.h"
17 
18 using namespace folly;
19 using namespace testing;
20 using namespace wangle;
21 
24 
25 class TestClientPipelineFactory : public PipelineFactory<DefaultPipeline> {
26  public:
28  std::shared_ptr<AsyncTransportWrapper> socket) override {
29  // Socket should be connected already
30  EXPECT_TRUE(socket->good());
31 
32  auto pipeline = DefaultPipeline::create();
33  pipeline->addBack(wangle::AsyncSocketHandler(socket));
34  pipeline->finalize();
35  return pipeline;
36  }
37 };
38 
40  public:
41  void SetUp() override {
42  routingData_.routingData = 'A';
43 
44  downstreamHandler_ = new MockBytesToBytesHandler();
45  downstreamPipelineFactory_ =
46  std::make_shared<MockDownstreamPipelineFactory>(downstreamHandler_);
47 
48  server_ = std::make_unique<TestServer>();
49 
50  // A routing pipeline with a mock routing handler that we can set
51  // expectations on.
52  routingPipeline_ = DefaultPipeline::create();
53 
54  routingDataHandlerFactory_ =
55  std::make_shared<MockRoutingDataHandlerFactory>();
56  acceptRoutingHandler_ = new MockAcceptRoutingHandler(
57  server_.get(),
58  routingDataHandlerFactory_,
59  downstreamPipelineFactory_,
60  routingPipeline_);
61  routingDataHandler_ =
62  new MockRoutingDataHandler(kConnId0, acceptRoutingHandler_);
63  routingDataHandlerFactory_->setRoutingDataHandler(routingDataHandler_);
64 
65  acceptPipeline_ = AcceptPipeline::create();
66  acceptPipeline_->addBack(
67  std::shared_ptr<MockAcceptRoutingHandler>(acceptRoutingHandler_));
68  acceptPipeline_->finalize();
69 
70  // A single threaded IOGroup shared between client and server for a
71  // deterministic event list.
72  auto ioGroup = std::make_shared<IOThreadPoolExecutor>(kNumIOThreads);
73 
74  acceptPipelineFactory_ =
75  std::make_shared<MockAcceptPipelineFactory>(acceptPipeline_);
76  server_->pipeline(acceptPipelineFactory_)->group(ioGroup, ioGroup)->bind(0);
77  server_->getSockets()[0]->getAddress(&address_);
78  VLOG(4) << "Start server at " << address_;
79  }
80 
82  return server_->getIOGroup()->getEventBase();
83  }
84 
86  client_ = std::make_shared<TestClient>();
87  client_->pipelineFactory(std::make_shared<TestClientPipelineFactory>());
88  client_->group(server_->getIOGroup());
89  return client_->connect(address_);
90  }
91 
93  auto clientPipelinePromise =
94  std::make_shared<folly::Promise<DefaultPipeline*>>();
95 
97  clientConnect().thenValue([=](DefaultPipeline* clientPipeline) {
98  VLOG(4) << "Client connected. Send data.";
99  auto data = IOBuf::create(1);
100  data->append(1);
101  *(data->writableData()) = 'a';
102  clientPipeline->write(std::move(data)).thenValue([=](auto&&) {
103  clientPipelinePromise->setValue(clientPipeline);
104  });
105  });
106  });
107 
108  return clientPipelinePromise->getFuture();
109  }
110 
112  auto clientPipelinePromise =
113  std::make_shared<folly::Promise<DefaultPipeline*>>();
114 
116  clientConnectAndWrite().thenValue([=](DefaultPipeline* clientPipeline) {
117  VLOG(4) << "Client close";
118  clientPipeline->close().thenValue(
119  [=](auto&&) { clientPipelinePromise->setValue(clientPipeline); });
120  });
121  });
122 
123  return clientPipelinePromise->getFuture();
124  }
125 
127  auto clientPipelinePromise =
128  std::make_shared<folly::Promise<DefaultPipeline*>>();
130  clientConnect().thenValue([=](DefaultPipeline* clientPipeline) {
131  clientPipelinePromise->setValue(clientPipeline);
132  });
133  });
134 
135  return clientPipelinePromise->getFuture();
136  }
137 
138  void sendClientException(DefaultPipeline* clientPipeline) {
140  clientPipeline->writeException(
141  std::runtime_error("Client socket exception, right after connect."));
142  });
143  }
144 
145  void TearDown() override {
146  acceptPipeline_.reset();
147  acceptPipelineFactory_->cleanup();
148  }
149 
150  protected:
151  std::unique_ptr<TestServer> server_;
152  std::shared_ptr<MockAcceptPipelineFactory> acceptPipelineFactory_;
155  std::shared_ptr<MockRoutingDataHandlerFactory> routingDataHandlerFactory_;
157 
160  std::shared_ptr<MockDownstreamPipelineFactory> downstreamPipelineFactory_;
163 
164  std::shared_ptr<TestClient> client_;
165 
166  int kConnId0{0};
167  int kNumIOThreads{1};
168 };
169 
170 TEST_F(AcceptRoutingHandlerTest, ParseRoutingDataSuccess) {
171  // Server receives data, and parses routing data
172  EXPECT_CALL(*routingDataHandler_, transportActive(_));
173  EXPECT_CALL(*routingDataHandler_, parseRoutingData(_, _))
174  .WillOnce(
175  Invoke([&](folly::IOBufQueue& /*bufQueue*/,
176  MockRoutingDataHandler::RoutingData& /*routingData*/) {
177  VLOG(4) << "Parsed routing data";
178  return true;
179  }));
180 
181  // Downstream pipeline is created, and its handler receives events
182  boost::barrier barrier(2);
183  EXPECT_CALL(*downstreamHandler_, transportActive(_));
184  EXPECT_CALL(*downstreamHandler_, read(_, _))
185  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/,
186  IOBufQueue& /*bufQueue*/) {
187  VLOG(4) << "Downstream received a read";
188  }));
189  EXPECT_CALL(*downstreamHandler_, readEOF(_))
190  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
191  VLOG(4) << "Downstream EOF";
192  ctx->fireClose();
193  barrier.wait();
194  }));
195  EXPECT_CALL(*downstreamHandler_, transportInactive(_));
196 
197  // Send client request that triggers server processing
198  clientConnectAndCleanClose();
199 
200  barrier.wait();
201 
202  // Routing pipeline has been erased
203  EXPECT_EQ(0, acceptRoutingHandler_->getRoutingPipelineCount());
204 }
205 
206 TEST_F(AcceptRoutingHandlerTest, SocketErrorInRoutingPipeline) {
207  // Server receives data, and parses routing data
208  boost::barrier barrierConnect(2);
209  EXPECT_CALL(*routingDataHandler_, transportActive(_));
210  EXPECT_CALL(*routingDataHandler_, parseRoutingData(_, _))
211  .WillOnce(
212  Invoke([&](folly::IOBufQueue& /*bufQueue*/,
213  MockRoutingDataHandler::RoutingData& /*routingData*/) {
214  VLOG(4) << "Need more data to be parse.";
215  barrierConnect.wait();
216  return false;
217  }));
218 
219  // Send client request that triggers server processing
220  auto futureClientPipeline = clientConnectAndWrite();
221 
222  // Socket exception after routing pipeline had been created
223  barrierConnect.wait();
224  boost::barrier barrierException(2);
225  std::move(futureClientPipeline).thenValue([](DefaultPipeline* clientPipeline) {
226  clientPipeline->getTransport()->getEventBase()->runInEventBaseThread(
227  [clientPipeline]() {
228  clientPipeline->writeException(
229  std::runtime_error("Socket error while expecting routing data."));
230  });
231  });
232  EXPECT_CALL(*routingDataHandler_, readException(_, _))
233  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/,
235  VLOG(4) << "Routing data handler Exception";
236  acceptRoutingHandler_->onError(kConnId0, ex);
237  barrierException.wait();
238  }));
239  barrierException.wait();
240 
241  // Downstream pipeline is not created
242  EXPECT_CALL(*downstreamHandler_, transportActive(_)).Times(0);
243  delete downstreamHandler_;
244 
245  // Routing pipeline has been erased
246  EXPECT_EQ(0, acceptRoutingHandler_->getRoutingPipelineCount());
247 }
248 
249 TEST_F(AcceptRoutingHandlerTest, OnNewConnectionWithBadSocket) {
250  // Routing data handler doesn't receive any data
251  EXPECT_CALL(*routingDataHandler_, parseRoutingData(_, _)).Times(0);
252 
253  // Downstream pipeline is not created
254  EXPECT_CALL(*downstreamHandler_, transportActive(_)).Times(0);
255  delete downstreamHandler_;
256 
257  // Send client request that triggers server processing
258  boost::barrier barrierConnect(2);
259  EXPECT_CALL(*routingDataHandler_, transportActive(_))
260  .WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/) {
261  barrierConnect.wait();
262  }));
263  auto futureClientPipeline = justClientConnect();
264  barrierConnect.wait();
265  futureClientPipeline.wait();
266 
267  // Expect an exception on the routing data handler
268  boost::barrier barrierException(2);
269  EXPECT_CALL(*routingDataHandler_, readException(_, _))
270  .WillOnce(Invoke(
271  [&](MockBytesToBytesHandler::Context* /*ctx*/,
272  folly::exception_wrapper /*ex*/) { barrierException.wait(); }));
273  sendClientException(futureClientPipeline.value());
274  barrierException.wait();
275 
276  // Routing pipeline has been added
277  EXPECT_EQ(1, acceptRoutingHandler_->getRoutingPipelineCount());
278 }
279 
280 TEST_F(AcceptRoutingHandlerTest, RoutingPipelineErasedOnlyOnce) {
281  // Simulate client socket throwing an exception, while routing data handler
282  // parsed data successfully.
283  acceptPipeline_->readException(
284  std::runtime_error("An exception from the socket."));
285  acceptRoutingHandler_->onRoutingData(kConnId0, routingData_);
286 }
Future< DefaultPipeline * > clientConnectAndCleanClose()
std::unique_ptr< TestServer > server_
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type close()
Definition: Pipeline-inl.h:258
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
MockRoutingDataHandler * routingDataHandler_
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
std::shared_ptr< MockRoutingDataHandlerFactory > routingDataHandlerFactory_
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
RoutingDataHandler< char >::RoutingData routingData_
Future< DefaultPipeline * > clientConnectAndWrite()
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type write(W msg)
Definition: Pipeline-inl.h:235
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
MockBytesToBytesHandler * downstreamHandler_
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
Future< DefaultPipeline * > justClientConnect()
EventBase * getEventBase() override
Implements the IOExecutor interface.
Definition: EventBase.cpp:776
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
MockAcceptRoutingHandler * acceptRoutingHandler_
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
std::shared_ptr< folly::AsyncTransport > getTransport()
Definition: Pipeline.h:65
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::shared_ptr< TestClient > client_
std::shared_ptr< MockDownstreamPipelineFactory > downstreamPipelineFactory_
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type writeException(folly::exception_wrapper e)
Definition: Pipeline-inl.h:246
void sendClientException(DefaultPipeline *clientPipeline)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
Future< DefaultPipeline * > clientConnect()
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
std::shared_ptr< MockAcceptPipelineFactory > acceptPipelineFactory_