proxygen
AsyncUDPSocketGSOTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <numeric>
18 #include <thread>
19 
20 #include <folly/Conv.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
29 
33 using folly::EventBase;
34 using folly::IOBuf;
36 using namespace testing;
37 
38 struct TestData {
40  int gso,
41  bool useSocketGSO,
42  int* in,
43  size_t inLen,
44  int* expected,
45  size_t expectedLen)
46  : gso_(gso), useSocketGSO_(useSocketGSO) {
47  in_.assign(in, in + inLen);
48  expected_.assign(expected, expected + expectedLen);
49 
50  expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0);
51  }
52 
53  bool checkIn() const {
54  return (expectedSize_ == std::accumulate(in_.begin(), in_.end(), 0));
55  }
56 
57  bool checkOut() const {
58  return (expectedSize_ == std::accumulate(out_.begin(), out_.end(), 0));
59  }
60 
61  bool appendOut(int num) {
62  out_.push_back(num);
63  outSize_ += num;
64 
65  return (outSize_ >= expectedSize_);
66  }
67 
68  std::unique_ptr<folly::IOBuf> getInBuf() {
69  if (!in_.size()) {
70  return nullptr;
71  }
72 
73  std::string str(in_[0], 'A');
74  std::unique_ptr<folly::IOBuf> ret =
75  folly::IOBuf::copyBuffer(str.data(), str.size());
76 
77  for (size_t i = 1; i < in_.size(); i++) {
78  str = std::string(in_[i], 'A');
79  ret->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size()));
80  }
81 
82  return ret;
83  }
84 
85  int gso_{0};
86  bool useSocketGSO_{false};
87  std::vector<int> in_;
88  std::vector<int> expected_; // expected
90  std::vector<int> out_;
91  int outSize_{0};
92 };
93 
94 class UDPAcceptor : public AsyncUDPServerSocket::Callback {
95  public:
96  UDPAcceptor(EventBase* evb) : evb_(evb) {}
97 
98  void onListenStarted() noexcept override {}
99 
100  void onListenStopped() noexcept override {}
101 
103  std::shared_ptr<folly::AsyncUDPSocket> socket,
104  const folly::SocketAddress& client,
105  std::unique_ptr<folly::IOBuf> data,
106  bool /*unused*/) noexcept override {
107  // send pong
108  socket->write(client, data->clone());
109  }
110 
111  private:
112  EventBase* const evb_{nullptr};
113 };
114 
115 class UDPServer {
116  public:
118  : evb_(evb), addr_(addr), evbs_(n) {}
119 
120  void start() {
121  CHECK(evb_->isInEventBaseThread());
122 
123  socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
124 
125  try {
126  socket_->bind(addr_);
127  VLOG(4) << "Server listening on " << socket_->address().describe();
128  } catch (const std::exception& ex) {
129  LOG(FATAL) << ex.what();
130  }
131 
132  acceptors_.reserve(evbs_.size());
133  threads_.reserve(evbs_.size());
134 
135  // Add numWorkers thread
136  int i = 0;
137  for (auto& evb : evbs_) {
138  acceptors_.emplace_back(&evb);
139 
140  std::thread t([&]() { evb.loopForever(); });
141 
142  evb.waitUntilRunning();
143 
144  socket_->addListener(&evb, &acceptors_[i]);
145  threads_.emplace_back(std::move(t));
146  ++i;
147  }
148 
149  socket_->listen();
150  }
151 
153  return socket_->address();
154  }
155 
156  void shutdown() {
157  CHECK(evb_->isInEventBaseThread());
158  socket_->close();
159  socket_.reset();
160 
161  for (auto& evb : evbs_) {
162  evb.terminateLoopSoon();
163  }
164 
165  for (auto& t : threads_) {
166  t.join();
167  }
168  }
169 
170  void pauseAccepting() {
171  socket_->pauseAccepting();
172  }
173 
175  socket_->resumeAccepting();
176  }
177 
178  private:
179  EventBase* const evb_{nullptr};
181 
182  std::unique_ptr<AsyncUDPServerSocket> socket_;
183  std::vector<std::thread> threads_;
184  std::vector<folly::EventBase> evbs_;
185  std::vector<UDPAcceptor> acceptors_;
186 };
187 
188 class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
189  public:
190  explicit UDPClient(EventBase* evb, TestData& testData)
191  : AsyncTimeout(evb), evb_(evb), testData_(testData) {}
192 
193  void start(const folly::SocketAddress& server) {
194  CHECK(evb_->isInEventBaseThread());
195  server_ = server;
196  socket_ = std::make_unique<AsyncUDPSocket>(evb_);
197 
198  try {
199  socket_->bind(folly::SocketAddress("127.0.0.1", 0));
200  if (connectAddr_) {
201  connect();
202  }
203  VLOG(2) << "Client bound to " << socket_->address().describe();
204  } catch (const std::exception& ex) {
205  LOG(FATAL) << ex.what();
206  }
207 
208  // succeed if GSO not available
209  if (socket_->getGSO() < 0) {
210  LOG(INFO) << "GSO not supported";
211  testData_.out_ = testData_.expected_;
212  shutdown();
213  return;
214  }
215 
216  if (testData_.useSocketGSO_) {
217  socket_->setGSO(testData_.gso_);
218  } else {
219  socket_->setGSO(0);
220  }
221 
222  socket_->resumeRead(this);
223 
224  // Start playing ping pong
225  sendPing();
226  }
227 
228  void connect() {
229  int ret = socket_->connect(*connectAddr_);
230  if (ret != 0) {
232  folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
233  }
234  VLOG(2) << "Client connected to address=" << *connectAddr_;
235  }
236 
237  void shutdown() {
238  CHECK(evb_->isInEventBaseThread());
239  socket_->pauseRead();
240  socket_->close();
241  socket_.reset();
243  }
244 
245  void sendPing() {
246  scheduleTimeout(5);
247  writePing(
248  testData_.getInBuf(), testData_.useSocketGSO_ ? -1 : testData_.gso_);
249  }
250 
251  virtual void writePing(std::unique_ptr<folly::IOBuf> buf, int gso) {
252  socket_->writeGSO(server_, std::move(buf), gso);
253  }
254 
255  void getReadBuffer(void** buf, size_t* len) noexcept override {
256  *buf = buf_;
257  *len = sizeof(buf_);
258  }
259 
261  const folly::SocketAddress& /*unused*/,
262  size_t len,
263  bool /*unused*/) noexcept override {
264  VLOG(0) << "Got " << len << " bytes";
265  if (testData_.appendOut(len)) {
266  shutdown();
267  }
268  }
269 
271  VLOG(4) << ex.what();
272 
273  // Start listening for next PONG
274  socket_->resumeRead(this);
275  }
276 
277  void onReadClosed() noexcept override {
278  CHECK(false) << "We unregister reads before closing";
279  }
280 
281  void timeoutExpired() noexcept override {
282  VLOG(4) << "Timeout expired";
283  shutdown();
284  }
285 
287  return *socket_;
288  }
289 
290  void setShouldConnect(const folly::SocketAddress& connectAddr) {
291  connectAddr_ = connectAddr;
292  }
293 
294  protected:
296  EventBase* const evb_{nullptr};
297 
299  std::unique_ptr<AsyncUDPSocket> socket_;
300 
301  private:
302  char buf_[2048];
303  TestData& testData_;
304 };
305 
307  public:
308  void SetUp() override {
309  server = std::make_unique<UDPServer>(
310  &sevb, folly::SocketAddress("127.0.0.1", 0), 1);
311 
312  // Start event loop in a separate thread
313  serverThread =
314  std::make_unique<std::thread>([this]() { sevb.loopForever(); });
315 
316  // Wait for event loop to start
317  sevb.waitUntilRunning();
318  }
319 
320  void startServer() {
321  // Start the server
322  sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
323  LOG(INFO) << "Server listening=" << server->address();
324  }
325 
326  void TearDown() override {
327  // Shutdown server
328  sevb.runInEventBaseThread([&]() {
329  server->shutdown();
330  sevb.terminateLoopSoon();
331  });
332 
333  // Wait for server thread to join
334  serverThread->join();
335  }
336 
337  std::unique_ptr<UDPClient> performPingPongTest(
338  TestData& testData,
339  folly::Optional<folly::SocketAddress> connectedAddress);
340 
343  TestData* testData_{nullptr};
344  std::unique_ptr<std::thread> serverThread;
345  std::unique_ptr<UDPServer> server;
346  std::unique_ptr<UDPClient> client;
347 };
348 
350  TestData& testData,
351  folly::Optional<folly::SocketAddress> connectedAddress) {
352  testData_ = &testData;
353 
354  client = std::make_unique<UDPClient>(&cevb, testData);
355  if (connectedAddress) {
356  client->setShouldConnect(*connectedAddress);
357  }
358 
359  // Start event loop in a separate thread
360  auto clientThread = std::thread([this]() { cevb.loopForever(); });
361 
362  // Wait for event loop to start
363  cevb.waitUntilRunning();
364 
365  // Send ping
366  cevb.runInEventBaseThread([&]() { client->start(server->address()); });
367 
368  // Wait for client to finish
369  clientThread.join();
370  return std::move(client);
371 }
372 
374  int gso = 1000;
375  int in[] = {100, 1200, 3000, 200, 100, 300};
376  int expected[] = {1000, 1000, 1000, 1000, 900};
377  TestData testData(
378  gso,
379  true /*useSocketGSO*/,
380  in,
381  sizeof(in) / sizeof(in[0]),
382  expected,
383  sizeof(expected) / sizeof(expected[0]));
384  ASSERT_TRUE(testData.checkIn());
385  startServer();
386  auto pingClient = performPingPongTest(testData, folly::none);
387  ASSERT_TRUE(testData.checkOut());
388 }
389 
390 TEST_F(AsyncSocketGSOIntegrationTest, PingPongRequestGSO) {
391  int gso = 421;
392  int in[] = {100, 1200, 3000, 200, 100, 300};
393  int expected[] = {421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 269};
394  TestData testData(
395  gso,
396  false /*useSocketGSO*/,
397  in,
398  sizeof(in) / sizeof(in[0]),
399  expected,
400  sizeof(expected) / sizeof(expected[0]));
401  ASSERT_TRUE(testData.checkIn());
402  startServer();
403  auto pingClient = performPingPongTest(testData, folly::none);
404  ASSERT_TRUE(testData.checkOut());
405 }
406 
407 // buffer sizes
408 constexpr auto kGSO1 = 100;
409 constexpr auto kGSO2 = 200;
410 constexpr auto kGSO = kGSO1 + kGSO2;
411 
412 class GSOBuf {
413  public:
414  explicit GSOBuf(size_t size1, size_t size2 = 0) {
415  std::string str(size1, 'A');
416  ioBuf_ = folly::IOBuf::copyBuffer(str.data(), str.size());
417 
418  if (size2) {
419  str = std::string(size2, 'B');
420  auto tmp = folly::IOBuf::copyBuffer(str.data(), str.size());
421  ioBuf_->prependChain(std::move(tmp));
422  }
423  }
424 
425  const std::unique_ptr<IOBuf>& get() const {
426  return ioBuf_;
427  }
428 
429  private:
430  std::unique_ptr<IOBuf> ioBuf_;
431 };
432 
433 class GSOSendTest {
434  public:
435  explicit GSOSendTest(
437  const folly::SocketAddress& address,
438  int gso,
439  size_t size1,
440  size_t size2 = 0) {
441  GSOBuf buf(size1, size2);
442 
443  ret_ = socket.writeGSO(address, buf.get(), gso);
444  }
445 
446  ssize_t get() const {
447  return ret_;
448  }
449 
450  private:
451  ssize_t ret_;
452 };
453 
454 TEST(AsyncSocketGSOTest, send) {
455  EventBase evb;
456  folly::AsyncUDPSocket client(&evb);
457  client.bind(folly::SocketAddress("127.0.0.1", 0));
458  if (client.getGSO() < 0) {
459  LOG(INFO) << "GSO not supported";
460  // GSO not supported
461  return;
462  }
463 
464  folly::AsyncUDPSocket server(&evb);
465  server.bind(folly::SocketAddress("127.0.0.1", 0));
466 
467  // send less than GSO in a single IOBuf
468  {
469  GSOSendTest test(client, server.address(), kGSO, kGSO - 1);
470  CHECK_LT(test.get(), 0);
471  }
472 
473  // send less than GSO in multiple IOBufs
474  {
475  GSOSendTest test(client, server.address(), kGSO, kGSO1 - 1, kGSO2);
476  CHECK_LT(test.get(), 0);
477  }
478 
479  // send GSO in a single IOBuf
480  {
481  GSOSendTest test(client, server.address(), kGSO, kGSO);
482  CHECK_LT(test.get(), 0);
483  }
484 
485  // send GSO in multiple IOBuf
486  {
487  GSOSendTest test(client, server.address(), kGSO, kGSO1, kGSO2);
488  CHECK_LT(test.get(), 0);
489  }
490 
491  // send more than GSO in a single IOBuf
492  {
493  GSOSendTest test(client, server.address(), kGSO, kGSO + 1);
494  CHECK_EQ(test.get(), kGSO + 1);
495  }
496 
497  // send more than GSO in a multiple IOBufs
498  {
499  GSOSendTest test(client, server.address(), kGSO, kGSO1 + 1, kGSO2 + 1);
500  CHECK_EQ(test.get(), kGSO + 2);
501  }
502 }
virtual const folly::SocketAddress & address() const
TestData(int gso, bool useSocketGSO, int *in, size_t inLen, int *expected, size_t expectedLen)
std::vector< std::thread > threads_
bool checkIn() const
constexpr auto kGSO
AsyncUDPSocket & getSocket()
size_t writePing(IOBufQueue &queue, uint64_t opaqueData, bool ack) noexcept
folly::SocketAddress server_
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
GSOBuf(size_t size1, size_t size2=0)
std::unique_ptr< IOBuf > ioBuf_
void accumulate(std::vector< std::size_t > &a, std::vector< std::size_t > const &d)
Definition: F14TestUtil.h:58
std::unique_ptr< AsyncUDPServerSocket > socket_
void onDataAvailable(const folly::SocketAddress &, size_t len, bool) noexceptoverride
constexpr auto kGSO2
UDPAcceptor(EventBase *evb)
std::vector< int > out_
TEST_F(TestInfoTest, Names)
std::unique_ptr< folly::IOBuf > getInBuf()
std::unique_ptr< AsyncUDPSocket > socket_
void onDataAvailable(std::shared_ptr< folly::AsyncUDPSocket > socket, const folly::SocketAddress &client, std::unique_ptr< folly::IOBuf > data, bool) noexceptoverride
virtual ssize_t writeGSO(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf, int gso)
void onListenStarted() noexceptoverride
static uint64_t test(std::string name, bool fc_, bool dedicated_, bool tc_, bool syncops_, uint64_t base)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::Optional< folly::SocketAddress > connectAddr_
void onListenStopped() noexceptoverride
bool appendOut(int num)
std::unique_ptr< IOBuf > clone() const
Definition: IOBuf.cpp:527
UDPServer(EventBase *evb, folly::SocketAddress addr, int n)
requires E e noexcept(noexcept(s.error(std::move(e))))
std::vector< int > in_
UDPClient(EventBase *evb, TestData &testData)
std::unique_ptr< UDPClient > client
bool isInEventBaseThread() const
Definition: EventBase.h:504
std::vector< folly::EventBase > evbs_
EventBase * evb_
virtual void writePing(std::unique_ptr< folly::IOBuf > buf, int gso)
void start(const folly::SocketAddress &server)
TEST(GTestEnvVarTest, Dummy)
void terminateLoopSoon()
Definition: EventBase.cpp:493
Input & in_
Definition: json.cpp:342
AsyncServerSocket::UniquePtr socket_
void timeoutExpired() noexceptoverride
std::unique_ptr< std::thread > serverThread
std::unique_ptr< UDPServer > server
void shutdown(Counter &)
bool checkOut() const
std::unique_ptr< UDPClient > performPingPongTest(TestData &testData, folly::Optional< folly::SocketAddress > connectedAddress)
ssize_t send(NetworkSocket s, const void *buf, size_t len, int flags)
Definition: NetOps.cpp:319
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::vector< UDPAcceptor > acceptors_
virtual void bind(const folly::SocketAddress &address)
void prependChain(std::unique_ptr< IOBuf > &&iobuf)
Definition: IOBuf.cpp:509
void onReadClosed() noexceptoverride
std::string & out_
Definition: json.cpp:185
void setShouldConnect(const folly::SocketAddress &connectAddr)
const folly::SocketAddress addr_
const char * string
Definition: Conv.cpp:212
std::vector< int > expected_
const std::unique_ptr< IOBuf > & get() const
void onReadError(const folly::AsyncSocketException &ex) noexceptoverride
void getReadBuffer(void **buf, size_t *len) noexceptoverride
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
ThreadPoolListHook * addr
folly::SocketAddress address() const
constexpr auto kGSO1
constexpr None none
Definition: Optional.h:87
GSOSendTest(folly::AsyncUDPSocket &socket, const folly::SocketAddress &address, int gso, size_t size1, size_t size2=0)