proxygen
AsyncUDPSocketTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thread>
18 
19 #include <folly/Conv.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/String.h>
22 #include <folly/io/IOBuf.h>
30 
34 using folly::errnoStr;
35 using folly::EventBase;
36 using folly::IOBuf;
38 using namespace testing;
39 
40 class UDPAcceptor : public AsyncUDPServerSocket::Callback {
41  public:
42  UDPAcceptor(EventBase* evb, int n, bool changePortForWrites)
43  : evb_(evb), n_(n), changePortForWrites_(changePortForWrites) {}
44 
45  void onListenStarted() noexcept override {}
46 
47  void onListenStopped() noexcept override {}
48 
50  std::shared_ptr<folly::AsyncUDPSocket> socket,
51  const folly::SocketAddress& client,
52  std::unique_ptr<folly::IOBuf> data,
53  bool truncated) noexcept override {
54  lastClient_ = client;
55  lastMsg_ = data->clone()->moveToFbString().toStdString();
56 
57  auto len = data->computeChainDataLength();
58  VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
59  << "(trun:" << truncated << ") from " << client.describe() << " - "
60  << lastMsg_;
61 
62  sendPong(socket);
63  }
64 
65  void sendPong(std::shared_ptr<folly::AsyncUDPSocket> socket) noexcept {
66  try {
67  auto writeSocket = socket;
68  if (changePortForWrites_) {
69  writeSocket = std::make_shared<folly::AsyncUDPSocket>(evb_);
70  writeSocket->setReuseAddr(false);
71  writeSocket->bind(folly::SocketAddress("127.0.0.1", 0));
72  }
73  writeSocket->write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
74  } catch (const std::exception& ex) {
75  VLOG(4) << "Failed to send PONG " << ex.what();
76  }
77  }
78 
79  private:
80  EventBase* const evb_{nullptr};
81  const int n_{-1};
82  // Whether to create a new port per write.
83  bool changePortForWrites_{true};
84 
87 };
88 
89 class UDPServer {
90  public:
92  : evb_(evb), addr_(addr), evbs_(n) {}
93 
94  void start() {
95  CHECK(evb_->isInEventBaseThread());
96 
97  socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
98 
99  try {
100  socket_->bind(addr_);
101  VLOG(4) << "Server listening on " << socket_->address().describe();
102  } catch (const std::exception& ex) {
103  LOG(FATAL) << ex.what();
104  }
105 
106  acceptors_.reserve(evbs_.size());
107  threads_.reserve(evbs_.size());
108 
109  // Add numWorkers thread
110  int i = 0;
111  for (auto& evb : evbs_) {
112  acceptors_.emplace_back(&evb, i, changePortForWrites_);
113 
114  std::thread t([&]() { evb.loopForever(); });
115 
116  evb.waitUntilRunning();
117 
118  socket_->addListener(&evb, &acceptors_[i]);
119  threads_.emplace_back(std::move(t));
120  ++i;
121  }
122 
123  socket_->listen();
124  }
125 
127  return socket_->address();
128  }
129 
130  void shutdown() {
131  CHECK(evb_->isInEventBaseThread());
132  socket_->close();
133  socket_.reset();
134 
135  for (auto& evb : evbs_) {
136  evb.terminateLoopSoon();
137  }
138 
139  for (auto& t : threads_) {
140  t.join();
141  }
142  }
143 
144  void pauseAccepting() {
145  socket_->pauseAccepting();
146  }
147 
149  socket_->resumeAccepting();
150  }
151 
152  // Whether writes from the UDP server should change the port for each message.
153  void setChangePortForWrites(bool changePortForWrites) {
154  changePortForWrites_ = changePortForWrites;
155  }
156 
157  private:
158  EventBase* const evb_{nullptr};
159  const folly::SocketAddress addr_;
160 
161  std::unique_ptr<AsyncUDPServerSocket> socket_;
162  std::vector<std::thread> threads_;
163  std::vector<folly::EventBase> evbs_;
164  std::vector<UDPAcceptor> acceptors_;
165  bool changePortForWrites_{true};
166 };
167 
168 class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
169  public:
170  explicit UDPClient(EventBase* evb) : AsyncTimeout(evb), evb_(evb) {}
171 
172  void start(const folly::SocketAddress& server, int n) {
173  CHECK(evb_->isInEventBaseThread());
174  server_ = server;
175  socket_ = std::make_unique<AsyncUDPSocket>(evb_);
176 
177  try {
178  socket_->bind(folly::SocketAddress("127.0.0.1", 0));
179  if (connectAddr_) {
180  connect();
181  }
182  VLOG(2) << "Client bound to " << socket_->address().describe();
183  } catch (const std::exception& ex) {
184  LOG(FATAL) << ex.what();
185  }
186 
187  socket_->resumeRead(this);
188 
189  n_ = n;
190 
191  // Start playing ping pong
192  sendPing();
193  }
194 
195  void connect() {
196  int ret = socket_->connect(*connectAddr_);
197  if (ret != 0) {
199  folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
200  }
201  VLOG(2) << "Client connected to address=" << *connectAddr_;
202  }
203 
204  void shutdown() {
205  CHECK(evb_->isInEventBaseThread());
206  socket_->pauseRead();
207  socket_->close();
208  socket_.reset();
210  }
211 
212  void sendPing() {
213  if (n_ == 0) {
214  shutdown();
215  return;
216  }
217 
218  --n_;
219  scheduleTimeout(5);
220  writePing(folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
221  }
222 
223  virtual void writePing(std::unique_ptr<folly::IOBuf> buf) {
224  socket_->write(server_, std::move(buf));
225  }
226 
227  void getReadBuffer(void** buf, size_t* len) noexcept override {
228  *buf = buf_;
229  *len = 1024;
230  }
231 
233  const folly::SocketAddress& client,
234  size_t len,
235  bool truncated) noexcept override {
236  VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
237  << client.describe() << " - " << std::string(buf_, len);
238  VLOG(4) << n_ << " left";
239 
240  ++pongRecvd_;
241 
242  sendPing();
243  }
244 
246  VLOG(4) << ex.what();
247 
248  // Start listening for next PONG
249  socket_->resumeRead(this);
250  }
251 
252  void onReadClosed() noexcept override {
253  CHECK(false) << "We unregister reads before closing";
254  }
255 
256  void timeoutExpired() noexcept override {
257  VLOG(4) << "Timeout expired";
258  sendPing();
259  }
260 
261  int pongRecvd() const {
262  return pongRecvd_;
263  }
264 
266  return *socket_;
267  }
268 
269  void setShouldConnect(const folly::SocketAddress& connectAddr) {
270  connectAddr_ = connectAddr;
271  }
272 
273  protected:
275  EventBase* const evb_{nullptr};
276 
277  folly::SocketAddress server_;
278  std::unique_ptr<AsyncUDPSocket> socket_;
279 
280  private:
281  int pongRecvd_{0};
282 
283  int n_{0};
284  char buf_[1024];
285 };
286 
288  public:
289  ~ConnectedWriteUDPClient() override = default;
290 
292 
293  // When the socket is connected you don't need to supply the address to send
294  // msg. This will test that connect worked.
295  void writePing(std::unique_ptr<folly::IOBuf> buf) override {
296  iovec vec[16];
297  size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0]));
298  if (UNLIKELY(iovec_len == 0)) {
299  buf->coalesce();
300  vec[0].iov_base = const_cast<uint8_t*>(buf->data());
301  vec[0].iov_len = buf->length();
302  iovec_len = 1;
303  }
304 
305  struct msghdr msg;
306  msg.msg_name = nullptr;
307  msg.msg_namelen = 0;
308  msg.msg_iov = const_cast<struct iovec*>(vec);
309  msg.msg_iovlen = iovec_len;
310  msg.msg_control = nullptr;
311  msg.msg_controllen = 0;
312  msg.msg_flags = 0;
313 
314  ssize_t ret = ::sendmsg(socket_->getFD(), &msg, 0);
315  if (ret == -1) {
316  if (errno != EAGAIN || errno != EWOULDBLOCK) {
318  folly::AsyncSocketException::NOT_OPEN, "WriteFail", errno);
319  }
320  }
321  connect();
322  }
323 };
324 
326  public:
327  void SetUp() override {
328  server = std::make_unique<UDPServer>(
329  &sevb, folly::SocketAddress("127.0.0.1", 0), 4);
330 
331  // Start event loop in a separate thread
332  serverThread =
333  std::make_unique<std::thread>([this]() { sevb.loopForever(); });
334 
335  // Wait for event loop to start
336  sevb.waitUntilRunning();
337  }
338 
339  void startServer() {
340  // Start the server
341  sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
342  LOG(INFO) << "Server listening=" << server->address();
343  }
344 
345  void TearDown() override {
346  // Shutdown server
347  sevb.runInEventBaseThread([&]() {
348  server->shutdown();
349  sevb.terminateLoopSoon();
350  });
351 
352  // Wait for server thread to join
353  serverThread->join();
354  }
355 
356  std::unique_ptr<UDPClient> performPingPongTest(
357  folly::Optional<folly::SocketAddress> connectedAddress,
358  bool useConnectedWrite);
359 
362  std::unique_ptr<std::thread> serverThread;
363  std::unique_ptr<UDPServer> server;
364  std::unique_ptr<UDPClient> client;
365 };
366 
368  folly::Optional<folly::SocketAddress> connectedAddress,
369  bool useConnectedWrite) {
370  if (useConnectedWrite) {
371  CHECK(connectedAddress.hasValue());
372  client = std::make_unique<ConnectedWriteUDPClient>(&cevb);
373  client->setShouldConnect(*connectedAddress);
374  } else {
375  client = std::make_unique<UDPClient>(&cevb);
376  if (connectedAddress) {
377  client->setShouldConnect(*connectedAddress);
378  }
379  }
380  // Start event loop in a separate thread
381  auto clientThread = std::thread([this]() { cevb.loopForever(); });
382 
383  // Wait for event loop to start
384  cevb.waitUntilRunning();
385 
386  // Send ping
387  cevb.runInEventBaseThread([&]() { client->start(server->address(), 100); });
388 
389  // Wait for client to finish
390  clientThread.join();
391  return std::move(client);
392 }
393 
395  startServer();
396  auto pingClient = performPingPongTest(folly::none, false);
397  // This should succeed.
398  ASSERT_GT(pingClient->pongRecvd(), 0);
399 }
400 
401 TEST_F(AsyncSocketIntegrationTest, ConnectedPingPong) {
402  server->setChangePortForWrites(false);
403  startServer();
404  auto pingClient = performPingPongTest(server->address(), false);
405  // This should succeed
406  ASSERT_GT(pingClient->pongRecvd(), 0);
407 }
408 
409 TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongServerWrongAddress) {
410  server->setChangePortForWrites(true);
411  startServer();
412  auto pingClient = performPingPongTest(server->address(), false);
413  // This should fail.
414  ASSERT_EQ(pingClient->pongRecvd(), 0);
415 }
416 
417 TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongClientWrongAddress) {
418  server->setChangePortForWrites(false);
419  startServer();
420  folly::SocketAddress connectAddr(
421  server->address().getIPAddress(), server->address().getPort() + 1);
422  auto pingClient = performPingPongTest(connectAddr, false);
423  // This should fail.
424  ASSERT_EQ(pingClient->pongRecvd(), 0);
425 }
426 
427 TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsg) {
428  server->setChangePortForWrites(false);
429  startServer();
430  auto pingClient = performPingPongTest(server->address(), true);
431  // This should succeed.
432  ASSERT_GT(pingClient->pongRecvd(), 0);
433 }
434 
435 TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsgServerWrongAddr) {
436  server->setChangePortForWrites(true);
437  startServer();
438  auto pingClient = performPingPongTest(server->address(), true);
439  // This should fail.
440  ASSERT_EQ(pingClient->pongRecvd(), 0);
441 }
442 
443 TEST_F(AsyncSocketIntegrationTest, PingPongPauseResumeListening) {
444  startServer();
445 
446  // Exchange should not happen when paused.
447  server->pauseAccepting();
448  auto pausedClient = performPingPongTest(folly::none, false);
449  ASSERT_EQ(pausedClient->pongRecvd(), 0);
450 
451  // Exchange does occur after resuming.
452  server->resumeAccepting();
453  auto pingClient = performPingPongTest(folly::none, false);
454  ASSERT_GT(pingClient->pongRecvd(), 0);
455 }
456 
458  public:
459  explicit TestAsyncUDPSocket(EventBase* evb) : AsyncUDPSocket(evb) {}
460 
461  MOCK_METHOD3(
462  sendmsg,
463  ssize_t(folly::NetworkSocket, const struct msghdr*, int));
464 };
465 
466 class MockErrMessageCallback : public AsyncUDPSocket::ErrMessageCallback {
467  public:
468  ~MockErrMessageCallback() override = default;
469 
470  MOCK_METHOD1(errMessage_, void(const cmsghdr&));
471  void errMessage(const cmsghdr& cmsg) noexcept override {
472  errMessage_(cmsg);
473  }
474 
475  MOCK_METHOD1(errMessageError_, void(const folly::AsyncSocketException&));
477  const folly::AsyncSocketException& ex) noexcept override {
478  errMessageError_(ex);
479  }
480 };
481 
482 class MockUDPReadCallback : public AsyncUDPSocket::ReadCallback {
483  public:
484  ~MockUDPReadCallback() override = default;
485 
486  MOCK_METHOD2(getReadBuffer_, void(void**, size_t*));
487  void getReadBuffer(void** buf, size_t* len) noexcept override {
488  getReadBuffer_(buf, len);
489  }
490 
491  MOCK_METHOD3(
492  onDataAvailable_,
493  void(const folly::SocketAddress&, size_t, bool));
495  const folly::SocketAddress& client,
496  size_t len,
497  bool truncated) noexcept override {
498  onDataAvailable_(client, len, truncated);
499  }
500 
501  MOCK_METHOD1(onReadError_, void(const folly::AsyncSocketException&));
503  onReadError_(ex);
504  }
505 
506  MOCK_METHOD0(onReadClosed_, void());
507  void onReadClosed() noexcept override {
508  onReadClosed_();
509  }
510 };
511 
512 class AsyncUDPSocketTest : public Test {
513  public:
514  void SetUp() override {
515  socket_ = std::make_shared<AsyncUDPSocket>(&evb_);
516  addr_ = folly::SocketAddress("127.0.0.1", 0);
517  socket_->bind(addr_);
518  }
519 
523  std::shared_ptr<AsyncUDPSocket> socket_;
525 };
526 
527 TEST_F(AsyncUDPSocketTest, TestConnect) {
528  EXPECT_EQ(socket_->connect(addr_), 0);
529 }
530 
531 TEST_F(AsyncUDPSocketTest, TestErrToNonExistentServer) {
532  socket_->resumeRead(&readCb);
533  socket_->setErrMessageCallback(&err);
534  folly::SocketAddress addr("127.0.0.1", 10000);
535  bool errRecvd = false;
536 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
537  EXPECT_CALL(err, errMessage_(_))
538  .WillOnce(Invoke([this, &errRecvd](auto& cmsg) {
539  if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
540  (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
541  const struct sock_extended_err* serr =
542  reinterpret_cast<const struct sock_extended_err*>(
543  CMSG_DATA(&cmsg));
544  errRecvd =
545  (serr->ee_origin == SO_EE_ORIGIN_ICMP || SO_EE_ORIGIN_ICMP6);
546  LOG(ERROR) << "errno " << errnoStr(serr->ee_errno);
547  }
549  }));
550 #endif // FOLLY_HAVE_MSG_ERRQUEUE
551  socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
552  evb_.loopForever();
553  EXPECT_TRUE(errRecvd);
554 }
555 
556 TEST_F(AsyncUDPSocketTest, TestUnsetErrCallback) {
557  socket_->resumeRead(&readCb);
558  socket_->setErrMessageCallback(&err);
559  socket_->setErrMessageCallback(nullptr);
560  folly::SocketAddress addr("127.0.0.1", 10000);
561  EXPECT_CALL(err, errMessage_(_)).Times(0);
562  socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
564  [&] { evb_.terminateLoopSoon(); }, std::chrono::milliseconds(30));
565  evb_.loopForever();
566 }
567 
568 TEST_F(AsyncUDPSocketTest, CloseInErrorCallback) {
569  socket_->resumeRead(&readCb);
570  socket_->setErrMessageCallback(&err);
571  folly::SocketAddress addr("127.0.0.1", 10000);
572  bool errRecvd = false;
573  EXPECT_CALL(err, errMessage_(_)).WillOnce(Invoke([this, &errRecvd](auto&) {
574  errRecvd = true;
575  socket_->close();
577  }));
578  socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
579  socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
580  evb_.loopForever();
581  EXPECT_TRUE(errRecvd);
582 }
583 
584 TEST_F(AsyncUDPSocketTest, TestNonExistentServerNoErrCb) {
585  socket_->resumeRead(&readCb);
586  folly::SocketAddress addr("127.0.0.1", 10000);
587  bool errRecvd = false;
589  EXPECT_CALL(readCb, getReadBuffer_(_, _))
590  .WillRepeatedly(Invoke([&readBuf](void** buf, size_t* len) {
591  auto readSpace = readBuf.preallocate(2000, 10000);
592  *buf = readSpace.first;
593  *len = readSpace.second;
594  }));
595  ON_CALL(readCb, onReadError_(_)).WillByDefault(Invoke([&errRecvd](auto& ex) {
596  LOG(ERROR) << ex.what();
597  errRecvd = true;
598  }));
599  socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
601  [&] { evb_.terminateLoopSoon(); }, std::chrono::milliseconds(30));
602  evb_.loopForever();
603  EXPECT_FALSE(errRecvd);
604 }
605 
608  EXPECT_FALSE(socket.isBound());
609  folly::SocketAddress address("0.0.0.0", 0);
610  socket.bind(address);
611  EXPECT_TRUE(socket.isBound());
612 }
613 
614 TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbWithReadCallback) {
615  socket_->resumeRead(&readCb);
616  EXPECT_TRUE(socket_->isHandlerRegistered());
617  socket_->detachEventBase();
618  EXPECT_FALSE(socket_->isHandlerRegistered());
619  socket_->attachEventBase(&evb_);
620  EXPECT_TRUE(socket_->isHandlerRegistered());
621 }
622 
623 TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbNoReadCallback) {
624  EXPECT_FALSE(socket_->isHandlerRegistered());
625  socket_->detachEventBase();
626  EXPECT_FALSE(socket_->isHandlerRegistered());
627  socket_->attachEventBase(&evb_);
628  EXPECT_FALSE(socket_->isHandlerRegistered());
629 }
630 
631 TEST_F(AsyncUDPSocketTest, TestDetachAttach) {
632  folly::EventBase evb2;
633  auto writeSocket = std::make_shared<folly::AsyncUDPSocket>(&evb_);
634  folly::SocketAddress address("127.0.0.1", 0);
635  writeSocket->bind(address);
636  std::array<uint8_t, 1024> data;
637  std::atomic<int> packetsRecvd{0};
638  EXPECT_CALL(readCb, getReadBuffer_(_, _))
639  .WillRepeatedly(Invoke([&](void** buf, size_t* len) {
640  *buf = data.data();
641  *len = 1024;
642  }));
643  EXPECT_CALL(readCb, onDataAvailable_(_, _, _))
644  .WillRepeatedly(Invoke(
645  [&](const folly::SocketAddress&, size_t, bool) { packetsRecvd++; }));
646  socket_->resumeRead(&readCb);
647  writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
648  while (packetsRecvd != 1) {
649  evb_.loopOnce();
650  }
651  EXPECT_EQ(packetsRecvd, 1);
652 
653  socket_->detachEventBase();
654  std::thread t([&] { evb2.loopForever(); });
655  evb2.runInEventBaseThreadAndWait([&] { socket_->attachEventBase(&evb2); });
656  writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
658  while (packetsRecvd != 2 ||
660  now + std::chrono::milliseconds(10)) {
661  std::this_thread::sleep_for(std::chrono::milliseconds(1));
662  }
663  evb2.runInEventBaseThread([&] {
664  socket_ = nullptr;
665  evb2.terminateLoopSoon();
666  });
667  t.join();
668  EXPECT_EQ(packetsRecvd, 2);
669 }
void onDataAvailable(std::shared_ptr< folly::AsyncUDPSocket > socket, const folly::SocketAddress &client, std::unique_ptr< folly::IOBuf > data, bool truncated) noexceptoverride
#define ASSERT_GT(val1, val2)
Definition: gtest.h:1976
AsyncUDPSocket & getSocket()
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
Definition: Types-inl.h:220
size_t writePing(IOBufQueue &queue, uint64_t opaqueData, bool ack) noexcept
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
ssize_t sendmsg(NetworkSocket socket, const msghdr *message, int flags)
Definition: NetOps.cpp:328
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::string lastMsg_
TEST_F(TestInfoTest, Names)
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
void writePing(std::unique_ptr< folly::IOBuf > buf) override
void errMessageError(const folly::AsyncSocketException &ex) noexceptoverride
ByteRange coalesce()
Definition: IOBuf.h:1095
void errMessage(const cmsghdr &cmsg) noexceptoverride
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void onListenStarted() noexceptoverride
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
virtual void writePing(std::unique_ptr< folly::IOBuf > buf)
std::unique_ptr< UDPServer > server
void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexceptoverride
const uint8_t * data() const
Definition: IOBuf.h:499
void onListenStopped() noexceptoverride
UDPServer(EventBase *evb, folly::SocketAddress addr, int n)
requires E e noexcept(noexcept(s.error(std::move(e))))
std::unique_ptr< UDPClient > performPingPongTest(folly::Optional< folly::SocketAddress > connectedAddress, bool useConnectedWrite)
void start(const folly::SocketAddress &server, int n)
folly::SocketAddress addr_
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
#define MOCK_METHOD3(m,...)
UDPAcceptor(EventBase *evb, int n, bool changePortForWrites)
size_t fillIov(struct iovec *iov, size_t len) const
Definition: IOBuf.cpp:1072
TestAsyncUDPSocket(EventBase *evb)
bool isInEventBaseThread() const
Definition: EventBase.h:504
folly::SocketAddress lastClient_
EventBase * evb_
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void onReadClosed() noexceptoverride
void terminateLoopSoon()
Definition: EventBase.cpp:493
AsyncServerSocket::UniquePtr socket_
std::size_t length() const
Definition: IOBuf.h:533
void timeoutExpired() noexceptoverride
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
Definition: Optional.h:300
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
void onReadError(const folly::AsyncSocketException &ex) noexceptoverride
void shutdown(Counter &)
Definition: Traits.h:588
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
virtual void bind(const folly::SocketAddress &address)
void scheduleTimeoutFn(F fn, std::chrono::milliseconds timeout)
Definition: HHWheelTimer.h:229
void onReadClosed() noexceptoverride
fbstring errnoStr(int err)
Definition: String.cpp:463
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void setShouldConnect(const folly::SocketAddress &connectAddr)
#define ON_CALL(obj, call)
std::unique_ptr< std::thread > serverThread
#define MOCK_METHOD1(m,...)
const char * string
Definition: Conv.cpp:212
MockErrMessageCallback err
void setChangePortForWrites(bool changePortForWrites)
vector< string > vec
Definition: StringTest.cpp:35
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
UDPClient(EventBase *evb)
void onReadError(const folly::AsyncSocketException &ex) noexceptoverride
void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexceptoverride
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void getReadBuffer(void **buf, size_t *len) noexceptoverride
int pongRecvd() const
void sendPong(std::shared_ptr< folly::AsyncUDPSocket > socket) noexcept
virtual bool isBound() const
void getReadBuffer(void **buf, size_t *len) noexceptoverride
#define UNLIKELY(x)
Definition: Likely.h:48
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
#define MOCK_METHOD2(m,...)
std::shared_ptr< AsyncUDPSocket > socket_
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
ThreadPoolListHook * addr
folly::SocketAddress address() const
ConnectedWriteUDPClient(EventBase *evb)
HHWheelTimer & timer()
Definition: EventBase.h:526
constexpr None none
Definition: Optional.h:87
std::unique_ptr< UDPClient > client
MockUDPReadCallback readCb
#define MOCK_METHOD0(m,...)