proxygen
BootstrapTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 #include "wangle/channel/Handler.h"
20 
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24 #include <folly/String.h>
26 
27 using namespace wangle;
28 using namespace folly;
29 
31 
34 
35 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
36  public:
38  std::shared_ptr<AsyncTransportWrapper> sock) override {
39  // Socket should be connected already
40  EXPECT_TRUE(sock->good());
41 
42  // Check after a small delay that socket is readable
44  EXPECT_TRUE(sock->readable());
45  }, 100);
46 
47  auto pipeline = BytesPipeline::create();
48  pipeline->addBack(new BytesToBytesHandler());
49  pipeline->finalize();
50  return pipeline;
51  }
52 };
53 
54 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
55  public:
57  std::shared_ptr<AsyncTransportWrapper>) override {
58  pipelines++;
59  auto pipeline = BytesPipeline::create();
60  pipeline->addBack(new BytesToBytesHandler());
61  pipeline->finalize();
62  return pipeline;
63  }
64  std::atomic<int> pipelines{0};
65 };
66 
67 class TestAcceptor : public Acceptor {
69  public:
71  Acceptor::init(nullptr, &base_);
72  }
74  const folly::SocketAddress*,
75  const std::string& /* nextProtocolName */,
77  const TransportInfo&) override {}
78 };
79 
81  public:
82  std::shared_ptr<Acceptor> newAcceptor(EventBase*) override {
83  return std::make_shared<TestAcceptor>();
84  }
85 };
86 
87 TEST(Bootstrap, Basic) {
88  TestServer server;
89  TestClient client;
90 }
91 
92 TEST(Bootstrap, ServerWithPipeline) {
93  TestServer server;
94  server.childPipeline(std::make_shared<TestPipelineFactory>());
95  server.bind(0);
96  server.stop();
97 }
98 
99 TEST(Bootstrap, ServerWithChildHandler) {
100  TestServer server;
101  server.childHandler(std::make_shared<TestAcceptorFactory>());
102  server.bind(0);
103  server.stop();
104 }
105 
106 TEST(Bootstrap, ClientServerTest) {
107  TestServer server;
108  auto factory = std::make_shared<TestPipelineFactory>();
109  server.childPipeline(factory);
110  server.bind(0);
111  auto base = EventBaseManager::get()->getEventBase();
112 
113  SocketAddress address;
114  server.getSockets()[0]->getAddress(&address);
115 
116  TestClient client;
117  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
118  client.connect(address);
119  base->loop();
120  server.stop();
121  server.join();
122 
123  EXPECT_EQ(factory->pipelines, 1);
124 }
125 
126 TEST(Bootstrap, ClientConnectionManagerTest) {
127  // Create a single IO thread, and verify that
128  // client connections are pooled properly
129 
130  TestServer server;
131  auto factory = std::make_shared<TestPipelineFactory>();
132  server.childPipeline(factory);
133  server.group(std::make_shared<IOThreadPoolExecutor>(1));
134  server.bind(0);
135  auto base = EventBaseManager::get()->getEventBase();
136 
137  SocketAddress address;
138  server.getSockets()[0]->getAddress(&address);
139 
140  TestClient client;
141  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
142 
143  client.connect(address);
144 
145  TestClient client2;
146  client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
147  client2.connect(address);
148 
149  base->loop();
150  server.stop();
151  server.join();
152 
153  EXPECT_EQ(factory->pipelines, 2);
154 }
155 
156 TEST(Bootstrap, ServerAcceptGroupTest) {
157  // Verify that server is using the accept IO group
158 
159  TestServer server;
160  auto factory = std::make_shared<TestPipelineFactory>();
161  server.childPipeline(factory);
162  server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
163  server.bind(0);
164 
165  SocketAddress address;
166  server.getSockets()[0]->getAddress(&address);
167 
168  boost::barrier barrier(2);
169  auto thread = std::thread([&](){
170  TestClient client;
171  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
172  client.connect(address);
174  barrier.wait();
175  });
176  barrier.wait();
177  server.stop();
178  thread.join();
179  server.join();
180 
181  EXPECT_EQ(factory->pipelines, 1);
182 }
183 
184 TEST(Bootstrap, ServerAcceptGroup2Test) {
185  // Verify that server is using the accept IO group
186 
187  // Check if reuse port is supported, if not, don't run this test
188  try {
189  EventBase base;
190  auto serverSocket = AsyncServerSocket::newSocket(&base);
191  serverSocket->bind(0);
192  serverSocket->listen(0);
193  serverSocket->startAccepting();
194  serverSocket->setReusePortEnabled(true);
195  serverSocket->stopAccepting();
196  } catch(...) {
197  LOG(INFO) << "Reuse port probably not supported";
198  return;
199  }
200 
201  TestServer server;
202  auto factory = std::make_shared<TestPipelineFactory>();
203  server.childPipeline(factory);
204  server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
205  server.bind(0);
206 
207  SocketAddress address;
208  server.getSockets()[0]->getAddress(&address);
209 
210  TestClient client;
211  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
212 
213  client.connect(address);
215 
216  server.stop();
217  server.join();
218 
219  EXPECT_EQ(factory->pipelines, 1);
220 }
221 
222 TEST(Bootstrap, SharedThreadPool) {
223  // Check if reuse port is supported, if not, don't run this test
224  try {
225  EventBase base;
226  auto serverSocket = AsyncServerSocket::newSocket(&base);
227  serverSocket->bind(0);
228  serverSocket->listen(0);
229  serverSocket->startAccepting();
230  serverSocket->setReusePortEnabled(true);
231  serverSocket->stopAccepting();
232  } catch(...) {
233  LOG(INFO) << "Reuse port probably not supported";
234  return;
235  }
236 
237  auto pool = std::make_shared<IOThreadPoolExecutor>(2);
238 
239  TestServer server;
240  auto factory = std::make_shared<TestPipelineFactory>();
241  server.childPipeline(factory);
242  server.group(pool, pool);
243 
244  server.bind(0);
245 
246  SocketAddress address;
247  server.getSockets()[0]->getAddress(&address);
248 
249  TestClient client;
250  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
251  client.connect(address);
252 
253  TestClient client2;
254  client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
255  client2.connect(address);
256 
257  TestClient client3;
258  client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
259  client3.connect(address);
260 
261  TestClient client4;
262  client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
263  client4.connect(address);
264 
265  TestClient client5;
266  client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
267  client5.connect(address);
268 
270 
271  server.stop();
272  server.join();
273 
274  EXPECT_EQ(factory->pipelines, 5);
275 }
276 
277 TEST(Bootstrap, ExistingSocket) {
278  TestServer server;
279  auto factory = std::make_shared<TestPipelineFactory>();
280  server.childPipeline(factory);
282  server.bind(std::move(socket));
283 }
284 
285 std::atomic<int> connections{0};
286 
287 class TestHandlerPipeline : public InboundHandler<AcceptPipelineType> {
288  public:
289  void read(Context* ctx, AcceptPipelineType conn) override {
290  if (conn.type() == typeid(ConnEvent)) {
291  auto connEvent = boost::get<ConnEvent>(conn);
292  if (connEvent == ConnEvent::CONN_ADDED) {
293  connections++;
294  }
295  }
296  return ctx->fireRead(conn);
297  }
298 };
299 
300 template <typename HandlerPipeline>
302  public:
304  auto pipeline = AcceptPipeline::create();
305  pipeline->addBack(HandlerPipeline());
306  return pipeline;
307  }
308 };
309 
310 TEST(Bootstrap, LoadBalanceHandler) {
311  TestServer server;
312  auto factory = std::make_shared<TestPipelineFactory>();
313  server.childPipeline(factory);
314 
315  auto pipelinefactory =
316  std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
317  server.pipeline(pipelinefactory);
318  server.bind(0);
319  auto base = EventBaseManager::get()->getEventBase();
320 
321  SocketAddress address;
322  server.getSockets()[0]->getAddress(&address);
323 
324  TestClient client;
325  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
326  client.connect(address);
327  base->loop();
328  server.stop();
329  server.join();
330 
331  EXPECT_EQ(factory->pipelines, 1);
333 }
334 
335 class TestUDPPipeline : public InboundHandler<AcceptPipelineType, Unit> {
336  public:
337  void read(Context*, AcceptPipelineType) override { connections++; }
338 };
339 
340 TEST(Bootstrap, UDP) {
341  TestServer server;
342  auto factory = std::make_shared<TestPipelineFactory>();
343  auto pipelinefactory =
344  std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
345  server.pipeline(pipelinefactory);
346  server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
347  server.bind(0);
348 }
349 
350 TEST(Bootstrap, UDPClientServerTest) {
351  connections = 0;
352 
353  TestServer server;
354  auto factory = std::make_shared<TestPipelineFactory>();
355  auto pipelinefactory =
356  std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
357  server.pipeline(pipelinefactory);
358  server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
359  server.bind(0);
360 
361  auto base = EventBaseManager::get()->getEventBase();
362 
363  SocketAddress address;
364  server.getSockets()[0]->getAddress(&address);
365 
366  SocketAddress localhost("::1", 0);
367  AsyncUDPSocket client(base);
368  client.bind(localhost);
369  auto data = IOBuf::create(1);
370  data->append(1);
371  *(data->writableData()) = 'a';
372  client.write(address, std::move(data));
373  base->loop();
374  server.stop();
375  server.join();
376 
378 }
379 
380 TEST(Bootstrap, UnixServer) {
381  TestServer server;
382  auto factory = std::make_shared<TestPipelineFactory>();
383 
384  folly::test::TemporaryDirectory tmpdir("wangle-bootstrap-test");
385  auto socketPath = (tmpdir.path() / "sock").string();
386 
387  server.childPipeline(factory);
388  SocketAddress address;
389  address.setFromPath(socketPath);
390  server.bind(address);
391  auto base = EventBaseManager::get()->getEventBase();
392 
393  TestClient client;
394  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
395  auto pipelineFuture = client.connect(address);
396  base->loop();
397  server.stop();
398  server.join();
399 
400  EXPECT_TRUE(std::move(pipelineFuture).get() != nullptr);
401  EXPECT_EQ(factory->pipelines, 1);
402 }
403 
404 TEST(Bootstrap, ServerBindFailure) {
405  // Bind to a TCP socket
406  EventBase base;
407  auto serverSocket = AsyncServerSocket::newSocket(&base);
408  serverSocket->bind(0);
409  serverSocket->listen(0);
410 
411  SocketAddress address;
412  serverSocket->getAddress(&address);
413 
414  // Now try starting a server using the address we are already listening on
415  // This should fail.
416 
417  TestServer server;
418  auto factory = std::make_shared<TestPipelineFactory>();
419  server.childPipeline(factory);
420  try {
421  server.bind(address);
422  FAIL() << "shouldn't be allowed to bind to an in-use address";
423  } catch (const std::system_error& ex) {
424  EXPECT_EQ(EADDRINUSE, ex.code().value()) << "unexpected error code " <<
425  ex.code().value() << ": " << ex.code().message();
426  }
427 }
void setFromPath(StringPiece path)
EventBase * getEventBase() const
#define FAIL()
Definition: gtest.h:1822
TEST(Wangle, ClientServerTest)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
std::atomic< int > connections
ConnEvent
Definition: Pipeline.h:266
EventBase base_
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
Definition: Pipeline.h:277
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
HandlerAdapter< folly::IOBufQueue &, std::unique_ptr< folly::IOBuf > > BytesToBytesHandler
Definition: Handler.h:173
folly::Future< Pipeline * > connect(const folly::SocketAddress &address, std::chrono::milliseconds timeout=std::chrono::milliseconds(0)) override
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void read(Context *ctx, AcceptPipelineType conn) override
static EventBaseManager * get()
virtual ssize_t write(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf)
BytesPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock) override
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
BaseClientBootstrap< P > * pipelineFactory(std::shared_ptr< PipelineFactory< P >> factory) noexcept
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
std::shared_ptr< Acceptor > newAcceptor(EventBase *) override
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
virtual void bind(const folly::SocketAddress &address)
void read(Context *, AcceptPipelineType) override
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
Pipeline< IOBufQueue &, std::unique_ptr< IOBuf > > BytesPipeline
ClientBootstrap< BytesPipeline > TestClient
AcceptPipeline::Ptr newPipeline(Acceptor *) override
virtual void fireRead(In msg)=0
BytesPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper >) override
const folly::SocketAddress & getAddress() const
const char * string
Definition: Conv.cpp:212
void onNewConnection(AsyncTransportWrapper::UniquePtr, const folly::SocketAddress *, const std::string &, SecureTransportType, const TransportInfo &) override
static Ptr create()
Definition: Pipeline.h:174
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
virtual void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr)
Definition: Acceptor.cpp:60
ServerBootstrap< BytesPipeline > TestServer