proxygen
AsyncSocketTest2.cpp File Reference

Go to the source code of this file.

Classes

class  DelayedWrite
 
class  AsyncSocketConnectTest
 
class  AsyncSocketImmediateRead
 
class  MockEvbChangeCallback
 

Enumerations

enum  TFOState { TFOState::DISABLED, TFOState::ENABLED }
 

Functions

 TEST (AsyncSocketTest, Connect)
 
std::vector< TFOStategetTestingValues ()
 
 INSTANTIATE_TEST_CASE_P (ConnectTests, AsyncSocketConnectTest,::testing::ValuesIn(getTestingValues()))
 
 TEST (AsyncSocketTest, ConnectRefused)
 
 TEST (AsyncSocketTest, ConnectTimeout)
 
 TEST_P (AsyncSocketConnectTest, ConnectAndWrite)
 
 TEST_P (AsyncSocketConnectTest, ConnectNullCallback)
 
 TEST_P (AsyncSocketConnectTest, ConnectWriteAndClose)
 
 TEST (AsyncSocketTest, ConnectAndClose)
 
 TEST (AsyncSocketTest, ConnectAndCloseNow)
 
 TEST (AsyncSocketTest, ConnectWriteAndCloseNow)
 
 TEST_P (AsyncSocketConnectTest, ConnectAndRead)
 
 TEST (AsyncSocketTest, ConnectReadAndClose)
 
 TEST_P (AsyncSocketConnectTest, ConnectWriteAndRead)
 
 TEST (AsyncSocketTest, ConnectWriteAndShutdownWrite)
 
 TEST (AsyncSocketTest, ConnectReadWriteAndShutdownWrite)
 
 TEST (AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow)
 
void tmpDisableReads (AsyncSocket *socket, ReadCallback *rcb)
 
void testConnectOptWrite (size_t size1, size_t size2, bool close=false)
 
 TEST (AsyncSocketTest, ConnectCallbackWrite)
 
 TEST (AsyncSocketTest, WriteNullCallback)
 
 TEST (AsyncSocketTest, WriteTimeout)
 
 TEST (AsyncSocketTest, WritePipeError)
 
 TEST (AsyncSocketTest, WriteAfterReadEOF)
 
 TEST (AsyncSocketTest, WriteErrorCallbackBytesWritten)
 
 TEST (AsyncSocketTest, WriteIOBuf)
 
 TEST (AsyncSocketTest, WriteIOBufCorked)
 
 TEST (AsyncSocketTest, ZeroLengthWrite)
 
 TEST (AsyncSocketTest, ZeroLengthWritev)
 
 TEST (AsyncSocketTest, ClosePendingWritesWhileClosing)
 
 TEST (AsyncSocket, ConnectReadImmediateRead)
 
 TEST (AsyncSocket, ConnectReadUninstallRead)
 
 TEST (AsyncSocketTest, ServerAcceptOptions)
 
 TEST (AsyncSocketTest, RemoveAcceptCallback)
 
 TEST (AsyncSocketTest, OtherThreadAcceptCallback)
 
void serverSocketSanityTest (AsyncServerSocket *serverSocket)
 
 TEST (AsyncSocketTest, DestroyCloseTest)
 
 TEST (AsyncSocketTest, ServerExistingSocket)
 
 TEST (AsyncSocketTest, UnixDomainSocketTest)
 
 TEST (AsyncSocketTest, ConnectionEventCallbackDefault)
 
 TEST (AsyncSocketTest, CallbackInPrimaryEventBase)
 
 TEST (AsyncSocketTest, CallbackInSecondaryEventBase)
 
 TEST (AsyncSocketTest, NumPendingMessagesInQueue)
 
 TEST (AsyncSocketTest, BufferTest)
 
 TEST (AsyncSocketTest, BufferCallbackKill)
 
 TEST (AsyncSocketTest, EvbCallbacks)
 
 TEST (AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers)
 
 TEST (AsyncSocket, PreReceivedData)
 
 TEST (AsyncSocket, PreReceivedDataOnly)
 
 TEST (AsyncSocket, PreReceivedDataPartial)
 
 TEST (AsyncSocket, PreReceivedDataTakeover)
 

Enumeration Type Documentation

enum TFOState
strong
Enumerator
DISABLED 
ENABLED 

Definition at line 118 of file AsyncSocketTest2.cpp.

118  {
119  DISABLED,
120  ENABLED,
121 };

Function Documentation

std::vector<TFOState> getTestingValues ( )

Definition at line 125 of file AsyncSocketTest2.cpp.

References DISABLED, ENABLED, and INSTANTIATE_TEST_CASE_P().

125  {
126  std::vector<TFOState> vals;
127  vals.emplace_back(TFOState::DISABLED);
128 
129 #if FOLLY_ALLOW_TFO
130  vals.emplace_back(TFOState::ENABLED);
131 #endif
132  return vals;
133 }
INSTANTIATE_TEST_CASE_P ( ConnectTests  ,
AsyncSocketConnectTest  ,
::testing::ValuesIn(getTestingValues())   
)

Referenced by getTestingValues().

void serverSocketSanityTest ( AsyncServerSocket serverSocket)

Definition at line 1911 of file AsyncSocketTest2.cpp.

References folly::AsyncServerSocket::addAcceptCallback(), ASSERT_EQ, folly::AsyncServerSocket::getAddress(), folly::AsyncServerSocket::getEventBase(), folly::test::TestAcceptCallback::getEvents(), folly::EventBase::loop(), folly::AsyncServerSocket::removeAcceptCallback(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::netops::socket(), folly::AsyncServerSocket::startAccepting(), folly::test::TestAcceptCallback::TYPE_ACCEPT, folly::test::TestAcceptCallback::TYPE_START, and folly::test::TestAcceptCallback::TYPE_STOP.

Referenced by TEST().

1911  {
1912  EventBase* eventBase = serverSocket->getEventBase();
1913  CHECK(eventBase);
1914 
1915  // Add a callback to accept one connection then stop accepting
1916  TestAcceptCallback acceptCallback;
1917  acceptCallback.setConnectionAcceptedFn(
1918  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1919  serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1920  });
1921  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1922  serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1923  });
1924  serverSocket->addAcceptCallback(&acceptCallback, eventBase);
1925  serverSocket->startAccepting();
1926 
1927  // Connect to the server socket
1928  folly::SocketAddress serverAddress;
1929  serverSocket->getAddress(&serverAddress);
1930  AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1931 
1932  // Loop to process all events
1933  eventBase->loop();
1934 
1935  // Verify that the server accepted a connection
1936  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1937  ASSERT_EQ(
1938  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1939  ASSERT_EQ(
1940  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1941  ASSERT_EQ(
1942  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1943 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
EventBase * getEventBase() const override
std::deque< EventInfo > * getEvents()
void getAddress(SocketAddress *addressReturn) const override
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)
TEST ( AsyncSocketTest  ,
Connect   
)

Test connecting to a server

Definition at line 96 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, EXPECT_EQ, EXPECT_GE, EXPECT_LE, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), now(), folly::netops::socket(), ConnCallback::state, and folly::STATE_SUCCEEDED.

96  {
97  // Start listening on a local port
98  TestServer server;
99 
100  // Connect using a AsyncSocket
101  EventBase evb;
102  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
103  ConnCallback cb;
104  const auto startedAt = std::chrono::steady_clock::now();
105  socket->connect(&cb, server.getAddress(), 30);
106 
107  evb.loop();
108  const auto finishedAt = std::chrono::steady_clock::now();
109 
111  EXPECT_LE(0, socket->getConnectTime().count());
112  EXPECT_GE(socket->getConnectStartTime(), startedAt);
113  EXPECT_LE(socket->getConnectStartTime(), socket->getConnectEndTime());
114  EXPECT_LE(socket->getConnectEndTime(), finishedAt);
115  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
116 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
TEST ( AsyncSocketTest  ,
ConnectRefused   
)

Test connecting to a server that isn't listening

Definition at line 143 of file AsyncSocketTest2.cpp.

References addr, ConnCallback::exception, EXPECT_EQ, EXPECT_LE, folly::AsyncSocketException::getType(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::AsyncSocketException::NOT_OPEN, folly::netops::socket(), ConnCallback::state, and folly::STATE_FAILED.

143  {
144  EventBase evb;
145 
146  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
147 
148  // Hopefully nothing is actually listening on this address
149  folly::SocketAddress addr("127.0.0.1", 65535);
150  ConnCallback cb;
151  socket->connect(&cb, addr, 30);
152 
153  evb.loop();
154 
156  EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
157  EXPECT_LE(0, socket->getConnectTime().count());
158  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
159 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
folly::AsyncSocketException exception
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
AsyncSocketExceptionType getType() const noexcept
ThreadPoolListHook * addr
TEST ( AsyncSocketTest  ,
ConnectTimeout   
)

Test connection timeout

Definition at line 164 of file AsyncSocketTest2.cpp.

References addr, ASSERT_EQ, ConnCallback::exception, EXPECT_EQ, EXPECT_LE, folly::AsyncSocketException::getType(), folly::SocketAddressTestHelper::isIPv4Enabled(), folly::SocketAddressTestHelper::isIPv6Enabled(), folly::SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4, folly::SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::AsyncSocketException::NOT_OPEN, SKIP, folly::netops::socket(), ConnCallback::state, folly::STATE_FAILED, and folly::AsyncSocketException::TIMED_OUT.

164  {
165  EventBase evb;
166 
167  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
168 
169  // Try connecting to server that won't respond.
170  //
171  // This depends somewhat on the network where this test is run.
172  // Hopefully this IP will be routable but unresponsive.
173  // (Alternatively, we could try listening on a local raw socket, but that
174  // normally requires root privileges.)
175  auto host = SocketAddressTestHelper::isIPv6Enabled()
176  ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
177  : SocketAddressTestHelper::isIPv4Enabled()
178  ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
179  : nullptr;
180  SocketAddress addr(host, 65535);
181  ConnCallback cb;
182  socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
183 
184  evb.loop();
185 
187  if (cb.exception.getType() == AsyncSocketException::NOT_OPEN) {
188  // This can happen if we could not route to the IP address picked above.
189  // In this case the connect will fail immediately rather than timing out.
190  // Just skip the test in this case.
191  SKIP() << "do not have a routable but unreachable IP address";
192  }
193  ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
194 
195  // Verify that we can still get the peer address after a timeout.
196  // Use case is if the client was created from a client pool, and we want
197  // to log which peer failed.
199  socket->getPeerAddress(&peer);
200  ASSERT_EQ(peer, addr);
201  EXPECT_LE(0, socket->getConnectTime().count());
202  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
203 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
folly::AsyncSocketException exception
#define SKIP()
Definition: TestUtils.h:55
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
AsyncSocketExceptionType getType() const noexcept
ThreadPoolListHook * addr
TEST ( AsyncSocketTest  ,
ConnectAndClose   
)

Test calling close() immediately after connect()

Definition at line 323 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, folly::STATE_FAILED, and folly::STATE_SUCCEEDED.

323  {
324  TestServer server;
325 
326  // Connect using a AsyncSocket
327  EventBase evb;
328  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
329  ConnCallback ccb;
330  socket->connect(&ccb, server.getAddress(), 30);
331 
332  // Hopefully the connect didn't succeed immediately.
333  // If it did, we can't exercise the close-while-connecting code path.
334  if (ccb.state == STATE_SUCCEEDED) {
335  LOG(INFO) << "connect() succeeded immediately; aborting test "
336  "of close-during-connect behavior";
337  return;
338  }
339 
340  socket->close();
341 
342  // Loop, although there shouldn't be anything to do.
343  evb.loop();
344 
345  // Make sure the connection was aborted
347 
348  ASSERT_TRUE(socket->isClosedBySelf());
349  ASSERT_FALSE(socket->isClosedByPeer());
350 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ConnectAndCloseNow   
)

Test calling closeNow() immediately after connect()

This should be identical to the normal close behavior.

Definition at line 357 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, folly::STATE_FAILED, and folly::STATE_SUCCEEDED.

357  {
358  TestServer server;
359 
360  // Connect using a AsyncSocket
361  EventBase evb;
362  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
363  ConnCallback ccb;
364  socket->connect(&ccb, server.getAddress(), 30);
365 
366  // Hopefully the connect didn't succeed immediately.
367  // If it did, we can't exercise the close-while-connecting code path.
368  if (ccb.state == STATE_SUCCEEDED) {
369  LOG(INFO) << "connect() succeeded immediately; aborting test "
370  "of closeNow()-during-connect behavior";
371  return;
372  }
373 
374  socket->closeNow();
375 
376  // Loop, although there shouldn't be anything to do.
377  evb.loop();
378 
379  // Make sure the connection was aborted
381 
382  ASSERT_TRUE(socket->isClosedBySelf());
383  ASSERT_FALSE(socket->isClosedByPeer());
384 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ConnectWriteAndCloseNow   
)

Test calling both write() and closeNow() immediately after connecting, without waiting for connect to finish.

This should abort the pending write.

Definition at line 392 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::STATE_FAILED, and folly::STATE_SUCCEEDED.

392  {
393  TestServer server;
394 
395  // connect()
396  EventBase evb;
397  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
398  ConnCallback ccb;
399  socket->connect(&ccb, server.getAddress(), 30);
400 
401  // Hopefully the connect didn't succeed immediately.
402  // If it did, we can't exercise the close-while-connecting code path.
403  if (ccb.state == STATE_SUCCEEDED) {
404  LOG(INFO) << "connect() succeeded immediately; aborting test "
405  "of write-during-connect behavior";
406  return;
407  }
408 
409  // write()
410  char buf[128];
411  memset(buf, 'a', sizeof(buf));
412  WriteCallback wcb;
413  socket->write(&wcb, buf, sizeof(buf));
414 
415  // close()
416  socket->closeNow();
417 
418  // Loop, although there shouldn't be anything to do.
419  evb.loop();
420 
423 
424  ASSERT_TRUE(socket->isClosedBySelf());
425  ASSERT_FALSE(socket->isClosedByPeer());
426 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ConnectReadAndClose   
)

Test installing a read callback and then closing immediately before the connect attempt finishes.

Definition at line 477 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, folly::ReadCallbackBase::state, folly::STATE_FAILED, and folly::STATE_SUCCEEDED.

477  {
478  TestServer server;
479 
480  // connect()
481  EventBase evb;
482  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
483  ConnCallback ccb;
484  socket->connect(&ccb, server.getAddress(), 30);
485 
486  // Hopefully the connect didn't succeed immediately.
487  // If it did, we can't exercise the close-while-connecting code path.
488  if (ccb.state == STATE_SUCCEEDED) {
489  LOG(INFO) << "connect() succeeded immediately; aborting test "
490  "of read-during-connect behavior";
491  return;
492  }
493 
494  ReadCallback rcb;
495  socket->setReadCB(&rcb);
496 
497  // close()
498  socket->close();
499 
500  // Loop, although there shouldn't be anything to do.
501  evb.loop();
502 
503  ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
504  ASSERT_EQ(rcb.buffers.size(), 0);
505  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
506 
507  ASSERT_TRUE(socket->isClosedBySelf());
508  ASSERT_FALSE(socket->isClosedByPeer());
509 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::vector< Buffer > buffers
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ConnectWriteAndShutdownWrite   
)

Test writing to the socket then shutting down writes before the connect attempt finishes.

Definition at line 579 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, folly::ReadCallback::buffers, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::poll(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_SUCCEEDED, uint32_t, and uint8_t.

579  {
580  TestServer server;
581 
582  // connect()
583  EventBase evb;
584  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
585  ConnCallback ccb;
586  socket->connect(&ccb, server.getAddress(), 30);
587 
588  // Hopefully the connect didn't succeed immediately.
589  // If it did, we can't exercise the write-while-connecting code path.
590  if (ccb.state == STATE_SUCCEEDED) {
591  LOG(INFO) << "connect() succeeded immediately; skipping test";
592  return;
593  }
594 
595  // Ask to write some data
596  char wbuf[128];
597  memset(wbuf, 'a', sizeof(wbuf));
598  WriteCallback wcb;
599  socket->write(&wcb, wbuf, sizeof(wbuf));
600  socket->shutdownWrite();
601 
602  // Shutdown writes
603  socket->shutdownWrite();
604 
605  // Even though we haven't looped yet, we should be able to accept
606  // the connection.
607  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
608 
609  // Since the connection is still in progress, there should be no data to
610  // read yet. Verify that the accepted socket is not readable.
611  struct pollfd fds[1];
612  fds[0].fd = acceptedSocket->getSocketFD();
613  fds[0].events = POLLIN;
614  fds[0].revents = 0;
615  int rc = poll(fds, 1, 0);
616  ASSERT_EQ(rc, 0);
617 
618  // Write data to the accepted socket
619  uint8_t acceptedWbuf[192];
620  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
621  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
622  acceptedSocket->flush();
623 
624  // Loop
625  evb.loop();
626 
627  // The loop should have completed the connection, written the queued data,
628  // and shutdown writes on the socket.
629  //
630  // Check that the connection was completed successfully and that the write
631  // callback succeeded.
634 
635  // Check that we can read the data that was written to the socket, and that
636  // we see an EOF, since its socket was half-shutdown.
637  uint8_t readbuf[sizeof(wbuf)];
638  acceptedSocket->readAll(readbuf, sizeof(readbuf));
639  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
640  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
641  ASSERT_EQ(bytesRead, 0);
642 
643  // Close the accepted socket. This will cause it to see EOF
644  // and uninstall the read callback when we loop next.
645  acceptedSocket->close();
646 
647  // Install a read callback, then loop again.
648  ReadCallback rcb;
649  socket->setReadCB(&rcb);
650  evb.loop();
651 
652  // This loop should have read the data and seen the EOF
654  ASSERT_EQ(rcb.buffers.size(), 1);
655  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
656  ASSERT_EQ(
657  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
658 
659  ASSERT_FALSE(socket->isClosedBySelf());
660  ASSERT_FALSE(socket->isClosedByPeer());
661 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
std::vector< Buffer > buffers
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
TEST ( AsyncSocketTest  ,
ConnectReadWriteAndShutdownWrite   
)

Test reading, writing, and shutting down writes before the connect attempt finishes.

Definition at line 667 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::poll(), folly::shutdown(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_SUCCEEDED, uint32_t, and uint8_t.

667  {
668  TestServer server;
669 
670  // connect()
671  EventBase evb;
672  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
673  ConnCallback ccb;
674  socket->connect(&ccb, server.getAddress(), 30);
675 
676  // Hopefully the connect didn't succeed immediately.
677  // If it did, we can't exercise the write-while-connecting code path.
678  if (ccb.state == STATE_SUCCEEDED) {
679  LOG(INFO) << "connect() succeeded immediately; skipping test";
680  return;
681  }
682 
683  // Install a read callback
684  ReadCallback rcb;
685  socket->setReadCB(&rcb);
686 
687  // Ask to write some data
688  char wbuf[128];
689  memset(wbuf, 'a', sizeof(wbuf));
690  WriteCallback wcb;
691  socket->write(&wcb, wbuf, sizeof(wbuf));
692 
693  // Shutdown writes
694  socket->shutdownWrite();
695 
696  // Even though we haven't looped yet, we should be able to accept
697  // the connection.
698  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
699 
700  // Since the connection is still in progress, there should be no data to
701  // read yet. Verify that the accepted socket is not readable.
702  struct pollfd fds[1];
703  fds[0].fd = acceptedSocket->getSocketFD();
704  fds[0].events = POLLIN;
705  fds[0].revents = 0;
706  int rc = poll(fds, 1, 0);
707  ASSERT_EQ(rc, 0);
708 
709  // Write data to the accepted socket
710  uint8_t acceptedWbuf[192];
711  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
712  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
713  acceptedSocket->flush();
714  // Shutdown writes to the accepted socket. This will cause it to see EOF
715  // and uninstall the read callback.
716  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
717 
718  // Loop
719  evb.loop();
720 
721  // The loop should have completed the connection, written the queued data,
722  // shutdown writes on the socket, read the data we wrote to it, and see the
723  // EOF.
724  //
725  // Check that the connection was completed successfully and that the read
726  // and write callbacks were invoked as expected.
729  ASSERT_EQ(rcb.buffers.size(), 1);
730  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
731  ASSERT_EQ(
732  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
734 
735  // Check that we can read the data that was written to the socket, and that
736  // we see an EOF, since its socket was half-shutdown.
737  uint8_t readbuf[sizeof(wbuf)];
738  acceptedSocket->readAll(readbuf, sizeof(readbuf));
739  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
740  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
741  ASSERT_EQ(bytesRead, 0);
742 
743  // Fully close both sockets
744  acceptedSocket->close();
745  socket->close();
746 
747  ASSERT_FALSE(socket->isClosedBySelf());
748  ASSERT_TRUE(socket->isClosedByPeer());
749 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
std::vector< Buffer > buffers
StateEnum state
void shutdown(Counter &)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ConnectReadWriteAndShutdownWriteNow   
)

Test reading, writing, and calling shutdownWriteNow() before the connect attempt finishes.

Definition at line 755 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, WriteCallback::bytesWritten, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::poll(), folly::shutdown(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_FAILED, folly::STATE_SUCCEEDED, uint32_t, and uint8_t.

755  {
756  TestServer server;
757 
758  // connect()
759  EventBase evb;
760  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
761  ConnCallback ccb;
762  socket->connect(&ccb, server.getAddress(), 30);
763 
764  // Hopefully the connect didn't succeed immediately.
765  // If it did, we can't exercise the write-while-connecting code path.
766  if (ccb.state == STATE_SUCCEEDED) {
767  LOG(INFO) << "connect() succeeded immediately; skipping test";
768  return;
769  }
770 
771  // Install a read callback
772  ReadCallback rcb;
773  socket->setReadCB(&rcb);
774 
775  // Ask to write some data
776  char wbuf[128];
777  memset(wbuf, 'a', sizeof(wbuf));
778  WriteCallback wcb;
779  socket->write(&wcb, wbuf, sizeof(wbuf));
780 
781  // Shutdown writes immediately.
782  // This should immediately discard the data that we just tried to write.
783  socket->shutdownWriteNow();
784 
785  // Verify that writeError() was invoked on the write callback.
787  ASSERT_EQ(wcb.bytesWritten, 0);
788 
789  // Even though we haven't looped yet, we should be able to accept
790  // the connection.
791  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
792 
793  // Since the connection is still in progress, there should be no data to
794  // read yet. Verify that the accepted socket is not readable.
795  struct pollfd fds[1];
796  fds[0].fd = acceptedSocket->getSocketFD();
797  fds[0].events = POLLIN;
798  fds[0].revents = 0;
799  int rc = poll(fds, 1, 0);
800  ASSERT_EQ(rc, 0);
801 
802  // Write data to the accepted socket
803  uint8_t acceptedWbuf[192];
804  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
805  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
806  acceptedSocket->flush();
807  // Shutdown writes to the accepted socket. This will cause it to see EOF
808  // and uninstall the read callback.
809  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
810 
811  // Loop
812  evb.loop();
813 
814  // The loop should have completed the connection, written the queued data,
815  // shutdown writes on the socket, read the data we wrote to it, and see the
816  // EOF.
817  //
818  // Check that the connection was completed successfully and that the read
819  // callback was invoked as expected.
822  ASSERT_EQ(rcb.buffers.size(), 1);
823  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
824  ASSERT_EQ(
825  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
826 
827  // Since we used shutdownWriteNow(), it should have discarded all pending
828  // write data. Verify we see an immediate EOF when reading from the accepted
829  // socket.
830  uint8_t readbuf[sizeof(wbuf)];
831  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
832  ASSERT_EQ(bytesRead, 0);
833 
834  // Fully close both sockets
835  acceptedSocket->close();
836  socket->close();
837 
838  ASSERT_FALSE(socket->isClosedBySelf());
839  ASSERT_TRUE(socket->isClosedByPeer());
840 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
std::vector< Buffer > buffers
StateEnum state
void shutdown(Counter &)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
std::atomic< size_t > bytesWritten
TEST ( AsyncSocketTest  ,
ConnectCallbackWrite   
)

Definition at line 954 of file AsyncSocketTest2.cpp.

References testConnectOptWrite().

954  {
955  // Test using small writes that should both succeed immediately
956  testConnectOptWrite(100, 200);
957 
958  // Test using a large buffer in the connect callback, that should block
959  const size_t largeSize = 32 * 1024 * 1024;
960  testConnectOptWrite(100, largeSize);
961 
962  // Test using a large initial write
963  testConnectOptWrite(largeSize, 100);
964 
965  // Test using two large buffers
966  testConnectOptWrite(largeSize, largeSize);
967 
968  // Test a small write in the connect callback,
969  // but no immediate write before connect completes
970  testConnectOptWrite(0, 64);
971 
972  // Test a large write in the connect callback,
973  // but no immediate write before connect completes
974  testConnectOptWrite(0, largeSize);
975 
976  // Test connect, a small write, then immediately call close() before connect
977  // completes
978  testConnectOptWrite(211, 0, true);
979 
980  // Test connect, a large immediate write (that will block), then immediately
981  // call close() before connect completes
982  testConnectOptWrite(largeSize, 0, true);
983 }
void testConnectOptWrite(size_t size1, size_t size2, bool close=false)
TEST ( AsyncSocketTest  ,
WriteNullCallback   
)

Test writing using a nullptr callback

Definition at line 992 of file AsyncSocketTest2.cpp.

References ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), and TestServer::verifyConnection().

992  {
993  TestServer server;
994 
995  // connect()
996  EventBase evb;
997  std::shared_ptr<AsyncSocket> socket =
998  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
999  evb.loop(); // loop until the socket is connected
1000 
1001  // write() with a nullptr callback
1002  char buf[128];
1003  memset(buf, 'a', sizeof(buf));
1004  socket->write(nullptr, buf, sizeof(buf));
1005 
1006  evb.loop(); // loop until the data is sent
1007 
1008  // Make sure the server got a connection and received the data
1009  socket->close();
1010  server.verifyConnection(buf, sizeof(buf));
1011 
1012  ASSERT_TRUE(socket->isClosedBySelf());
1013  ASSERT_FALSE(socket->isClosedByPeer());
1014 }
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void verifyConnection(const char *buf, size_t len)
TEST ( AsyncSocketTest  ,
WriteTimeout   
)

Test writing with a send timeout

Definition at line 1019 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::test::end(), TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), start, folly::STATE_FAILED, T_CHECK_TIMEOUT, folly::AsyncSocketException::TIMED_OUT, folly::detail::timeout, and uint32_t.

1019  {
1020  TestServer server;
1021 
1022  // connect()
1023  EventBase evb;
1024  std::shared_ptr<AsyncSocket> socket =
1025  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1026  evb.loop(); // loop until the socket is connected
1027 
1028  // write() a large chunk of data, with no-one on the other end reading.
1029  // Tricky: the kernel caches the connection metrics for recently-used
1030  // routes (see tcp_no_metrics_save) so a freshly opened connection can
1031  // have a send buffer size bigger than wmem_default. This makes the test
1032  // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1033  size_t writeLength = 32 * 1024 * 1024;
1034  uint32_t timeout = 200;
1035  socket->setSendTimeout(timeout);
1036  std::unique_ptr<char[]> buf(new char[writeLength]);
1037  memset(buf.get(), 'a', writeLength);
1038  WriteCallback wcb;
1039  socket->write(&wcb, buf.get(), writeLength);
1040 
1041  TimePoint start;
1042  evb.loop();
1043  TimePoint end;
1044 
1045  // Make sure the write attempt timed out as requested
1046  ASSERT_EQ(wcb.state, STATE_FAILED);
1047  ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1048 
1049  // Check that the write timed out within a reasonable period of time.
1050  // We don't check for exactly the specified timeout, since AsyncSocket only
1051  // times out when it hasn't made progress for that period of time.
1052  //
1053  // On linux, the first write sends a few hundred kb of data, then blocks for
1054  // writability, and then unblocks again after 40ms and is able to write
1055  // another smaller of data before blocking permanently. Therefore it doesn't
1056  // time out until 40ms + timeout.
1057  //
1058  // I haven't fully verified the cause of this, but I believe it probably
1059  // occurs because the receiving end delays sending an ack for up to 40ms.
1060  // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1061  // the ack, it can send some more data. However, after that point the
1062  // receiver's kernel buffer is full. This 40ms delay happens even with
1063  // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1064  // kernel may be automatically disabling TCP_QUICKACK after receiving some
1065  // data.
1066  //
1067  // For now, we simply check that the timeout occurred within 160ms of
1068  // the requested value.
1069  T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1070 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
auto start
const folly::SocketAddress & getAddress() const
#define T_CHECK_TIMEOUT(start, end, expectedMS,...)
Definition: Util.h:38
TEST ( AsyncSocketTest  ,
WritePipeError   
)

Test writing to a socket that the remote endpoint has closed

Definition at line 1075 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, TestServer::getAddress(), folly::AsyncSocketException::INTERNAL_ERROR, folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), and folly::STATE_FAILED.

1075  {
1076  TestServer server;
1077 
1078  // connect()
1079  EventBase evb;
1080  std::shared_ptr<AsyncSocket> socket =
1081  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1082  socket->setSendTimeout(1000);
1083  evb.loop(); // loop until the socket is connected
1084 
1085  // accept and immediately close the socket
1086  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1087  acceptedSocket->close();
1088 
1089  // write() a large chunk of data
1090  size_t writeLength = 32 * 1024 * 1024;
1091  std::unique_ptr<char[]> buf(new char[writeLength]);
1092  memset(buf.get(), 'a', writeLength);
1093  WriteCallback wcb;
1094  socket->write(&wcb, buf.get(), writeLength);
1095 
1096  evb.loop();
1097 
1098  // Make sure the write failed.
1099  // It would be nice if AsyncSocketException could convey the errno value,
1100  // so that we could check for EPIPE
1101  ASSERT_EQ(wcb.state, STATE_FAILED);
1102  ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::INTERNAL_ERROR);
1103 
1104  ASSERT_FALSE(socket->isClosedBySelf());
1105  ASSERT_FALSE(socket->isClosedByPeer());
1106 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
TEST ( AsyncSocketTest  ,
WriteAfterReadEOF   
)

Test writing to a socket that has its read side closed

Definition at line 1111 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), WriteCallback::state, folly::ReadCallbackBase::state, and folly::STATE_SUCCEEDED.

1111  {
1112  TestServer server;
1113 
1114  // connect()
1115  EventBase evb;
1116  std::shared_ptr<AsyncSocket> socket =
1117  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1118  evb.loop(); // loop until the socket is connected
1119 
1120  // Accept the connection
1121  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1122  ReadCallback rcb;
1123  acceptedSocket->setReadCB(&rcb);
1124 
1125  // Shutdown the write side of client socket (read side of server socket)
1126  socket->shutdownWrite();
1127  evb.loop();
1128 
1129  // Check that accepted socket is still writable
1130  ASSERT_FALSE(acceptedSocket->good());
1131  ASSERT_TRUE(acceptedSocket->writable());
1132 
1133  // Write data to accepted socket
1134  constexpr size_t simpleBufLength = 5;
1135  char simpleBuf[simpleBufLength];
1136  memset(simpleBuf, 'a', simpleBufLength);
1137  WriteCallback wcb;
1138  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1139  evb.loop();
1140 
1141  // Make sure we were able to write even after getting a read EOF
1142  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
1144 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
WriteErrorCallbackBytesWritten   
)

Test that bytes written is correctly computed in case of write failure

Definition at line 1149 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_GE, ASSERT_LE, EXPECT_EQ, TestServer::getAddress(), folly::EventBase::loopForever(), folly::AsyncSocket::newSocket(), now(), folly::EventBase::runInEventBaseThreadAndWait(), folly::netops::socket(), folly::STATE_FAILED, folly::EventBase::terminateLoopSoon(), uint8_t, and folly::fibers::yield().

1149  {
1150  // Send and receive buffer sizes for the sockets.
1151  // Note that Linux will double this value to allow space for bookkeeping
1152  // overhead.
1153  constexpr size_t kSockBufSize = 8 * 1024;
1154  constexpr size_t kEffectiveSockBufSize = 2 * kSockBufSize;
1155 
1156  TestServer server(false, kSockBufSize);
1157 
1158  AsyncSocket::OptionMap options{
1159  {{SOL_SOCKET, SO_SNDBUF}, int(kSockBufSize)},
1160  {{SOL_SOCKET, SO_RCVBUF}, int(kSockBufSize)},
1161  {{IPPROTO_TCP, TCP_NODELAY}, 1},
1162  };
1163 
1164  // The current thread will be used by the receiver - use a separate thread
1165  // for the sender.
1166  EventBase senderEvb;
1167  std::thread senderThread([&]() { senderEvb.loopForever(); });
1168 
1169  ConnCallback ccb;
1170  std::shared_ptr<AsyncSocket> socket;
1171 
1172  senderEvb.runInEventBaseThreadAndWait([&]() {
1173  socket = AsyncSocket::newSocket(&senderEvb);
1174  socket->connect(&ccb, server.getAddress(), 30, options);
1175  });
1176 
1177  // accept the socket on the server side
1178  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1179 
1180  // Send a big (100KB) write so that it is partially written.
1181  constexpr size_t kSendSize = 100 * 1024;
1182  auto const sendBuf = std::vector<char>(kSendSize, 'a');
1183 
1184  WriteCallback wcb;
1185 
1186  senderEvb.runInEventBaseThreadAndWait(
1187  [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
1188 
1189  // Read 20KB of data from the socket to allow the sender to send a bit more
1190  // data after it initially blocks.
1191  constexpr size_t kRecvSize = 20 * 1024;
1192  uint8_t recvBuf[kRecvSize];
1193  auto bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1194  ASSERT_EQ(kRecvSize, bytesRead);
1195  EXPECT_EQ(0, memcmp(recvBuf, sendBuf.data(), bytesRead));
1196 
1197  // We should be able to send at least the amount of data received plus the
1198  // send buffer size. In practice we should probably be able to send
1199  constexpr size_t kMinExpectedBytesWritten = kRecvSize + kSockBufSize;
1200 
1201  // We shouldn't be able to send more than the amount of data received plus
1202  // the send buffer size of the sending socket (kEffectiveSockBufSize) plus
1203  // the receive buffer size on the receiving socket (kEffectiveSockBufSize)
1204  constexpr size_t kMaxExpectedBytesWritten =
1205  kRecvSize + kEffectiveSockBufSize + kEffectiveSockBufSize;
1206  static_assert(
1207  kMaxExpectedBytesWritten < kSendSize, "kSendSize set too small");
1208 
1209  // Need to delay after receiving 20KB and before closing the receive side so
1210  // that the send side has a chance to fill the send buffer past.
1211  using clock = std::chrono::steady_clock;
1212  auto const deadline = clock::now() + std::chrono::seconds(2);
1213  while (wcb.bytesWritten < kMinExpectedBytesWritten &&
1214  clock::now() < deadline) {
1216  }
1217  acceptedSocket->closeWithReset();
1218 
1219  senderEvb.terminateLoopSoon();
1220  senderThread.join();
1221 
1222  ASSERT_EQ(STATE_FAILED, wcb.state);
1223  ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
1224  ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
1225 }
#define ASSERT_GE(val1, val2)
Definition: gtest.h:1972
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
#define ASSERT_LE(val1, val2)
Definition: gtest.h:1964
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
void terminateLoopSoon()
Definition: EventBase.cpp:493
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
TEST ( AsyncSocketTest  ,
WriteIOBuf   
)

Test writing a mix of simple buffers and IOBufs

Definition at line 1230 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), folly::IOBuf::append(), folly::IOBuf::appendChain(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, folly::IOBuf::clone(), folly::IOBuf::create(), EXPECT_FALSE, EXPECT_TRUE, TestServer::getAddress(), folly::EventBase::loop(), folly::gen::move, folly::AsyncSocket::newSocket(), folly::netops::socket(), WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_SUCCEEDED, and folly::IOBuf::writableData().

1230  {
1231  TestServer server;
1232 
1233  // connect()
1234  EventBase evb;
1235  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1236  ConnCallback ccb;
1237  socket->connect(&ccb, server.getAddress(), 30);
1238 
1239  // Accept the connection
1240  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1241  ReadCallback rcb;
1242  acceptedSocket->setReadCB(&rcb);
1243 
1244  // Check if EOR tracking flag can be set and reset.
1245  EXPECT_FALSE(socket->isEorTrackingEnabled());
1246  socket->setEorTracking(true);
1247  EXPECT_TRUE(socket->isEorTrackingEnabled());
1248  socket->setEorTracking(false);
1249  EXPECT_FALSE(socket->isEorTrackingEnabled());
1250 
1251  // Write a simple buffer to the socket
1252  constexpr size_t simpleBufLength = 5;
1253  char simpleBuf[simpleBufLength];
1254  memset(simpleBuf, 'a', simpleBufLength);
1255  WriteCallback wcb;
1256  socket->write(&wcb, simpleBuf, simpleBufLength);
1257 
1258  // Write a single-element IOBuf chain
1259  size_t buf1Length = 7;
1260  unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1261  memset(buf1->writableData(), 'b', buf1Length);
1262  buf1->append(buf1Length);
1263  unique_ptr<IOBuf> buf1Copy(buf1->clone());
1264  WriteCallback wcb2;
1265  socket->writeChain(&wcb2, std::move(buf1));
1266 
1267  // Write a multiple-element IOBuf chain
1268  size_t buf2Length = 11;
1269  unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1270  memset(buf2->writableData(), 'c', buf2Length);
1271  buf2->append(buf2Length);
1272  size_t buf3Length = 13;
1273  unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1274  memset(buf3->writableData(), 'd', buf3Length);
1275  buf3->append(buf3Length);
1276  buf2->appendChain(std::move(buf3));
1277  unique_ptr<IOBuf> buf2Copy(buf2->clone());
1278  buf2Copy->coalesce();
1279  WriteCallback wcb3;
1280  socket->writeChain(&wcb3, std::move(buf2));
1281  socket->shutdownWrite();
1282 
1283  // Let the reads and writes run to completion
1284  evb.loop();
1285 
1287  ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1289 
1290  // Make sure the reader got the right data in the right order
1292  ASSERT_EQ(rcb.buffers.size(), 1);
1293  ASSERT_EQ(
1294  rcb.buffers[0].length,
1295  simpleBufLength + buf1Length + buf2Length + buf3Length);
1296  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1297  ASSERT_EQ(
1298  memcmp(
1299  rcb.buffers[0].buffer + simpleBufLength,
1300  buf1Copy->data(),
1301  buf1Copy->length()),
1302  0);
1303  ASSERT_EQ(
1304  memcmp(
1305  rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1306  buf2Copy->data(),
1307  buf2Copy->length()),
1308  0);
1309 
1310  acceptedSocket->close();
1311  socket->close();
1312 
1313  ASSERT_TRUE(socket->isClosedBySelf());
1314  ASSERT_FALSE(socket->isClosedByPeer());
1315 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::vector< Buffer > buffers
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
WriteIOBufCorked   
)

Definition at line 1317 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), folly::IOBuf::append(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, folly::IOBuf::create(), WriteCallback::exception, TestServer::getAddress(), folly::EventBase::loop(), folly::gen::move, folly::AsyncSocket::newSocket(), folly::AsyncTimeout::scheduleTimeout(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_SUCCEEDED, and folly::IOBuf::writableData().

1317  {
1318  TestServer server;
1319 
1320  // connect()
1321  EventBase evb;
1322  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1323  ConnCallback ccb;
1324  socket->connect(&ccb, server.getAddress(), 30);
1325 
1326  // Accept the connection
1327  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1328  ReadCallback rcb;
1329  acceptedSocket->setReadCB(&rcb);
1330 
1331  // Do three writes, 100ms apart, with the "cork" flag set
1332  // on the second write. The reader should see the first write
1333  // arrive by itself, followed by the second and third writes
1334  // arriving together.
1335  size_t buf1Length = 5;
1336  unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1337  memset(buf1->writableData(), 'a', buf1Length);
1338  buf1->append(buf1Length);
1339  size_t buf2Length = 7;
1340  unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1341  memset(buf2->writableData(), 'b', buf2Length);
1342  buf2->append(buf2Length);
1343  size_t buf3Length = 11;
1344  unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1345  memset(buf3->writableData(), 'c', buf3Length);
1346  buf3->append(buf3Length);
1347  WriteCallback wcb1;
1348  socket->writeChain(&wcb1, std::move(buf1));
1349  WriteCallback wcb2;
1350  DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1351  write2.scheduleTimeout(100);
1352  WriteCallback wcb3;
1353  DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1354  write3.scheduleTimeout(140);
1355 
1356  evb.loop();
1360  if (wcb3.state != STATE_SUCCEEDED) {
1361  throw(wcb3.exception);
1362  }
1364 
1365  // Make sure the reader got the data with the right grouping
1367  ASSERT_EQ(rcb.buffers.size(), 2);
1368  ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1369  ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1370 
1371  acceptedSocket->close();
1372  socket->close();
1373 
1374  ASSERT_TRUE(socket->isClosedBySelf());
1375  ASSERT_FALSE(socket->isClosedByPeer());
1376 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
folly::AsyncSocketException exception
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::vector< Buffer > buffers
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ZeroLengthWrite   
)

Test performing a zero-length write

Definition at line 1381 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), WriteCallback::state, and folly::STATE_SUCCEEDED.

1381  {
1382  TestServer server;
1383 
1384  // connect()
1385  EventBase evb;
1386  std::shared_ptr<AsyncSocket> socket =
1387  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1388  evb.loop(); // loop until the socket is connected
1389 
1390  auto acceptedSocket = server.acceptAsync(&evb);
1391  ReadCallback rcb;
1392  acceptedSocket->setReadCB(&rcb);
1393 
1394  size_t len1 = 1024 * 1024;
1395  size_t len2 = 1024 * 1024;
1396  std::unique_ptr<char[]> buf(new char[len1 + len2]);
1397  memset(buf.get(), 'a', len1);
1398  memset(buf.get(), 'b', len2);
1399 
1400  WriteCallback wcb1;
1401  WriteCallback wcb2;
1402  WriteCallback wcb3;
1403  WriteCallback wcb4;
1404  socket->write(&wcb1, buf.get(), 0);
1405  socket->write(&wcb2, buf.get(), len1);
1406  socket->write(&wcb3, buf.get() + len1, 0);
1407  socket->write(&wcb4, buf.get() + len1, len2);
1408  socket->close();
1409 
1410  evb.loop(); // loop until the data is sent
1411 
1412  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1416  rcb.verifyData(buf.get(), len1 + len2);
1417 
1418  ASSERT_TRUE(socket->isClosedBySelf());
1419  ASSERT_FALSE(socket->isClosedByPeer());
1420 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ZeroLengthWritev   
)

Definition at line 1422 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), and folly::STATE_SUCCEEDED.

1422  {
1423  TestServer server;
1424 
1425  // connect()
1426  EventBase evb;
1427  std::shared_ptr<AsyncSocket> socket =
1428  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1429  evb.loop(); // loop until the socket is connected
1430 
1431  auto acceptedSocket = server.acceptAsync(&evb);
1432  ReadCallback rcb;
1433  acceptedSocket->setReadCB(&rcb);
1434 
1435  size_t len1 = 1024 * 1024;
1436  size_t len2 = 1024 * 1024;
1437  std::unique_ptr<char[]> buf(new char[len1 + len2]);
1438  memset(buf.get(), 'a', len1);
1439  memset(buf.get(), 'b', len2);
1440 
1441  WriteCallback wcb;
1442  constexpr size_t iovCount = 4;
1443  struct iovec iov[iovCount];
1444  iov[0].iov_base = buf.get();
1445  iov[0].iov_len = len1;
1446  iov[1].iov_base = buf.get() + len1;
1447  iov[1].iov_len = 0;
1448  iov[2].iov_base = buf.get() + len1;
1449  iov[2].iov_len = len2;
1450  iov[3].iov_base = buf.get() + len1 + len2;
1451  iov[3].iov_len = 0;
1452 
1453  socket->writev(&wcb, iov, iovCount);
1454  socket->close();
1455  evb.loop(); // loop until the data is sent
1456 
1457  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1458  rcb.verifyData(buf.get(), len1 + len2);
1459 
1460  ASSERT_TRUE(socket->isClosedBySelf());
1461  ASSERT_FALSE(socket->isClosedByPeer());
1462 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST ( AsyncSocketTest  ,
ClosePendingWritesWhileClosing   
)

Test calling close() with pending writes when the socket is already closing.

Definition at line 1471 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::netops::bind(), folly::AsyncSocket::close(), TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, folly::STATE_FAILED, and folly::STATE_SUCCEEDED.

1471  {
1472  TestServer server;
1473 
1474  // connect()
1475  EventBase evb;
1476  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1477  ConnCallback ccb;
1478  socket->connect(&ccb, server.getAddress(), 30);
1479 
1480  // accept the socket on the server side
1481  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1482 
1483  // Loop to ensure the connect has completed
1484  evb.loop();
1485 
1486  // Make sure we are connected
1488 
1489  // Schedule pending writes, until several write attempts have blocked
1490  char buf[128];
1491  memset(buf, 'a', sizeof(buf));
1492  typedef vector<std::shared_ptr<WriteCallback>> WriteCallbackVector;
1493  WriteCallbackVector writeCallbacks;
1494 
1495  writeCallbacks.reserve(5);
1496  while (writeCallbacks.size() < 5) {
1497  std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1498 
1499  socket->write(wcb.get(), buf, sizeof(buf));
1500  if (wcb->state == STATE_SUCCEEDED) {
1501  // Succeeded immediately. Keep performing more writes
1502  continue;
1503  }
1504 
1505  // This write is blocked.
1506  // Have the write callback call close() when writeError() is invoked
1507  wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1508  writeCallbacks.push_back(wcb);
1509  }
1510 
1511  // Call closeNow() to immediately fail the pending writes
1512  socket->closeNow();
1513 
1514  // Make sure writeError() was invoked on all of the pending write callbacks
1515  for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1516  it != writeCallbacks.end();
1517  ++it) {
1518  ASSERT_EQ((*it)->state, STATE_FAILED);
1519  }
1520 
1521  ASSERT_TRUE(socket->isClosedBySelf());
1522  ASSERT_FALSE(socket->isClosedByPeer());
1523 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
int close(NetworkSocket s)
Definition: NetOps.cpp:90
TEST ( AsyncSocket  ,
ConnectReadImmediateRead   
)

Definition at line 1542 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, ASSERT_FALSE, folly::AsyncSocket::connect(), TestServer::getAddress(), AsyncSocketImmediateRead::immediateReadCalled, folly::AsyncSocket::isClosedByPeer(), folly::AsyncSocket::isClosedBySelf(), folly::EventBase::loop(), folly::AsyncSocket::setMaxReadsPerEvent(), folly::AsyncSocket::setReadCB(), folly::netops::socket(), WriteCallback::state, folly::STATE_SUCCEEDED, and folly::AsyncSocket::write().

1542  {
1543  TestServer server;
1544 
1545  const size_t maxBufferSz = 100;
1546  const size_t maxReadsPerEvent = 1;
1547  const size_t expectedDataSz = maxBufferSz * 3;
1548  char expectedData[expectedDataSz];
1549  memset(expectedData, 'j', expectedDataSz);
1550 
1551  EventBase evb;
1552  ReadCallback rcb(maxBufferSz);
1554  socket.connect(nullptr, server.getAddress(), 30);
1555 
1556  evb.loop(); // loop until the socket is connected
1557 
1558  socket.setReadCB(&rcb);
1559  socket.setMaxReadsPerEvent(maxReadsPerEvent);
1560  socket.immediateReadCalled = false;
1561 
1562  auto acceptedSocket = server.acceptAsync(&evb);
1563 
1564  ReadCallback rcbServer;
1565  WriteCallback wcbServer;
1566  rcbServer.dataAvailableCallback = [&]() {
1567  if (rcbServer.dataRead() == expectedDataSz) {
1568  // write back all data read
1569  rcbServer.verifyData(expectedData, expectedDataSz);
1570  acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1571  acceptedSocket->close();
1572  }
1573  };
1574  acceptedSocket->setReadCB(&rcbServer);
1575 
1576  // write data
1577  WriteCallback wcb1;
1578  socket.write(&wcb1, expectedData, expectedDataSz);
1579  evb.loop();
1581  rcb.verifyData(expectedData, expectedDataSz);
1582  ASSERT_EQ(socket.immediateReadCalled, true);
1583 
1584  ASSERT_FALSE(socket.isClosedBySelf());
1585  ASSERT_FALSE(socket.isClosedByPeer());
1586 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
TEST ( AsyncSocket  ,
ConnectReadUninstallRead   
)

Definition at line 1588 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, ASSERT_FALSE, folly::AsyncSocket::connect(), TestServer::getAddress(), AsyncSocketImmediateRead::immediateReadCalled, folly::AsyncSocket::isClosedByPeer(), folly::AsyncSocket::isClosedBySelf(), folly::EventBase::loop(), folly::AsyncSocket::setMaxReadsPerEvent(), folly::AsyncSocket::setReadCB(), folly::netops::socket(), WriteCallback::state, folly::STATE_SUCCEEDED, and folly::AsyncSocket::write().

1588  {
1589  TestServer server;
1590 
1591  const size_t maxBufferSz = 100;
1592  const size_t maxReadsPerEvent = 1;
1593  const size_t expectedDataSz = maxBufferSz * 3;
1594  char expectedData[expectedDataSz];
1595  memset(expectedData, 'k', expectedDataSz);
1596 
1597  EventBase evb;
1598  ReadCallback rcb(maxBufferSz);
1600  socket.connect(nullptr, server.getAddress(), 30);
1601 
1602  evb.loop(); // loop until the socket is connected
1603 
1604  socket.setReadCB(&rcb);
1605  socket.setMaxReadsPerEvent(maxReadsPerEvent);
1606  socket.immediateReadCalled = false;
1607 
1608  auto acceptedSocket = server.acceptAsync(&evb);
1609 
1610  ReadCallback rcbServer;
1611  WriteCallback wcbServer;
1612  rcbServer.dataAvailableCallback = [&]() {
1613  if (rcbServer.dataRead() == expectedDataSz) {
1614  // write back all data read
1615  rcbServer.verifyData(expectedData, expectedDataSz);
1616  acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1617  acceptedSocket->close();
1618  }
1619  };
1620  acceptedSocket->setReadCB(&rcbServer);
1621 
1622  rcb.dataAvailableCallback = [&]() {
1623  // we read data and reset readCB
1624  socket.setReadCB(nullptr);
1625  };
1626 
1627  // write data
1628  WriteCallback wcb;
1629  socket.write(&wcb, expectedData, expectedDataSz);
1630  evb.loop();
1632 
1633  /* we shoud've only read maxBufferSz data since readCallback_
1634  * was reset in dataAvailableCallback */
1635  ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1636  ASSERT_EQ(socket.immediateReadCalled, false);
1637 
1638  ASSERT_FALSE(socket.isClosedBySelf());
1639  ASSERT_FALSE(socket.isClosedByPeer());
1640 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
TEST ( AsyncSocketTest  ,
ServerAcceptOptions   
)

Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set

Definition at line 1661 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::SocketAddress::getAddress(), folly::test::TestAcceptCallback::getEvents(), folly::netops::getsockopt(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::netops::socket(), folly::test::TestAcceptCallback::TYPE_ACCEPT, folly::test::TestAcceptCallback::TYPE_START, folly::test::TestAcceptCallback::TYPE_STOP, and folly::value().

1661  {
1662  EventBase eventBase;
1663 
1664  // Create a server socket
1665  std::shared_ptr<AsyncServerSocket> serverSocket(
1666  AsyncServerSocket::newSocket(&eventBase));
1667  serverSocket->bind(0);
1668  serverSocket->listen(16);
1669  folly::SocketAddress serverAddress;
1670  serverSocket->getAddress(&serverAddress);
1671 
1672  // Add a callback to accept one connection then stop the loop
1673  TestAcceptCallback acceptCallback;
1674  acceptCallback.setConnectionAcceptedFn(
1675  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1676  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1677  });
1678  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1679  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1680  });
1681  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1682  serverSocket->startAccepting();
1683 
1684  // Connect to the server socket
1685  std::shared_ptr<AsyncSocket> socket(
1686  AsyncSocket::newSocket(&eventBase, serverAddress));
1687 
1688  eventBase.loop();
1689 
1690  // Verify that the server accepted a connection
1691  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1692  ASSERT_EQ(
1693  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1694  ASSERT_EQ(
1695  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1696  ASSERT_EQ(
1697  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1698  int fd = acceptCallback.getEvents()->at(1).fd;
1699 
1700  // The accepted connection should already be in non-blocking mode
1701  int flags = fcntl(fd, F_GETFL, 0);
1702  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1703 
1704 #ifndef TCP_NOPUSH
1705  // The accepted connection should already have TCP_NODELAY set
1706  int value;
1707  socklen_t valueLength = sizeof(value);
1708  int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1709  ASSERT_EQ(rc, 0);
1710  ASSERT_EQ(value, 1);
1711 #endif
1712 }
flags
Definition: http_parser.h:127
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::deque< EventInfo > * getEvents()
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
static const char *const value
Definition: Conv.cpp:50
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
TEST ( AsyncSocketTest  ,
RemoveAcceptCallback   
)

Test AsyncServerSocket::removeAcceptCallback()

Definition at line 1717 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::SocketAddress::getAddress(), folly::test::TestAcceptCallback::getEvents(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::test::TestAcceptCallback::TYPE_ACCEPT, folly::test::TestAcceptCallback::TYPE_START, and folly::test::TestAcceptCallback::TYPE_STOP.

1717  {
1718  // Create a new AsyncServerSocket
1719  EventBase eventBase;
1720  std::shared_ptr<AsyncServerSocket> serverSocket(
1721  AsyncServerSocket::newSocket(&eventBase));
1722  serverSocket->bind(0);
1723  serverSocket->listen(16);
1724  folly::SocketAddress serverAddress;
1725  serverSocket->getAddress(&serverAddress);
1726 
1727  // Add several accept callbacks
1728  TestAcceptCallback cb1;
1729  TestAcceptCallback cb2;
1730  TestAcceptCallback cb3;
1731  TestAcceptCallback cb4;
1732  TestAcceptCallback cb5;
1733  TestAcceptCallback cb6;
1734  TestAcceptCallback cb7;
1735 
1736  // Test having callbacks remove other callbacks before them on the list,
1737  // after them on the list, or removing themselves.
1738  //
1739  // Have callback 2 remove callback 3 and callback 5 the first time it is
1740  // called.
1741  int cb2Count = 0;
1743  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1744  std::shared_ptr<AsyncSocket> sock2(AsyncSocket::newSocket(
1745  &eventBase, serverAddress)); // cb2: -cb3 -cb5
1746  });
1748  [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1750  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1751  std::shared_ptr<AsyncSocket> sock3(
1752  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1753  });
1755  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1756  std::shared_ptr<AsyncSocket> sock5(
1757  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1758  });
1760  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1761  if (cb2Count == 0) {
1762  serverSocket->removeAcceptCallback(&cb3, nullptr);
1763  serverSocket->removeAcceptCallback(&cb5, nullptr);
1764  }
1765  ++cb2Count;
1766  });
1767  // Have callback 6 remove callback 4 the first time it is called,
1768  // and destroy the server socket the second time it is called
1769  int cb6Count = 0;
1771  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1772  if (cb6Count == 0) {
1773  serverSocket->removeAcceptCallback(&cb4, nullptr);
1774  std::shared_ptr<AsyncSocket> sock6(
1775  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1776  std::shared_ptr<AsyncSocket> sock7(
1777  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1778  std::shared_ptr<AsyncSocket> sock8(
1779  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1780 
1781  } else {
1782  serverSocket.reset();
1783  }
1784  ++cb6Count;
1785  });
1786  // Have callback 7 remove itself
1788  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1789  serverSocket->removeAcceptCallback(&cb7, nullptr);
1790  });
1791 
1792  serverSocket->addAcceptCallback(&cb1, &eventBase);
1793  serverSocket->addAcceptCallback(&cb2, &eventBase);
1794  serverSocket->addAcceptCallback(&cb3, &eventBase);
1795  serverSocket->addAcceptCallback(&cb4, &eventBase);
1796  serverSocket->addAcceptCallback(&cb5, &eventBase);
1797  serverSocket->addAcceptCallback(&cb6, &eventBase);
1798  serverSocket->addAcceptCallback(&cb7, &eventBase);
1799  serverSocket->startAccepting();
1800 
1801  // Make several connections to the socket
1802  std::shared_ptr<AsyncSocket> sock1(
1803  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1804  std::shared_ptr<AsyncSocket> sock4(
1805  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1806 
1807  // Loop until we are stopped
1808  eventBase.loop();
1809 
1810  // Check to make sure that the expected callbacks were invoked.
1811  //
1812  // NOTE: This code depends on the AsyncServerSocket operating calling all of
1813  // the AcceptCallbacks in round-robin fashion, in the order that they were
1814  // added. The code is implemented this way right now, but the API doesn't
1815  // explicitly require it be done this way. If we change the code not to be
1816  // exactly round robin in the future, we can simplify the test checks here.
1817  // (We'll also need to update the termination code, since we expect cb6 to
1818  // get called twice to terminate the loop.)
1819  ASSERT_EQ(cb1.getEvents()->size(), 4);
1820  ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1821  ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1822  ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
1823  ASSERT_EQ(cb1.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
1824 
1825  ASSERT_EQ(cb2.getEvents()->size(), 4);
1826  ASSERT_EQ(cb2.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1827  ASSERT_EQ(cb2.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1828  ASSERT_EQ(cb2.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
1829  ASSERT_EQ(cb2.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
1830 
1831  ASSERT_EQ(cb3.getEvents()->size(), 2);
1832  ASSERT_EQ(cb3.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1833  ASSERT_EQ(cb3.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP);
1834 
1835  ASSERT_EQ(cb4.getEvents()->size(), 3);
1836  ASSERT_EQ(cb4.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1837  ASSERT_EQ(cb4.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1838  ASSERT_EQ(cb4.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1839 
1840  ASSERT_EQ(cb5.getEvents()->size(), 2);
1841  ASSERT_EQ(cb5.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1842  ASSERT_EQ(cb5.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP);
1843 
1844  ASSERT_EQ(cb6.getEvents()->size(), 4);
1845  ASSERT_EQ(cb6.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1846  ASSERT_EQ(cb6.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1847  ASSERT_EQ(cb6.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
1848  ASSERT_EQ(cb6.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
1849 
1850  ASSERT_EQ(cb7.getEvents()->size(), 3);
1851  ASSERT_EQ(cb7.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1852  ASSERT_EQ(cb7.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1853  ASSERT_EQ(cb7.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1854 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::deque< EventInfo > * getEvents()
socklen_t getAddress(sockaddr_storage *addr) const
TEST ( AsyncSocketTest  ,
OtherThreadAcceptCallback   
)

Test AsyncServerSocket::removeAcceptCallback()

Definition at line 1859 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::SocketAddress::getAddress(), folly::test::TestAcceptCallback::getEvents(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::test::TestAcceptCallback::setAcceptStartedFn(), folly::test::TestAcceptCallback::setAcceptStoppedFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::test::TestAcceptCallback::TYPE_ACCEPT, folly::test::TestAcceptCallback::TYPE_START, and folly::test::TestAcceptCallback::TYPE_STOP.

1859  {
1860  // Create a new AsyncServerSocket
1861  EventBase eventBase;
1862  std::shared_ptr<AsyncServerSocket> serverSocket(
1863  AsyncServerSocket::newSocket(&eventBase));
1864  serverSocket->bind(0);
1865  serverSocket->listen(16);
1866  folly::SocketAddress serverAddress;
1867  serverSocket->getAddress(&serverAddress);
1868 
1869  // Add several accept callbacks
1870  TestAcceptCallback cb1;
1871  auto thread_id = std::this_thread::get_id();
1872  cb1.setAcceptStartedFn([&]() {
1873  CHECK_NE(thread_id, std::this_thread::get_id());
1874  thread_id = std::this_thread::get_id();
1875  });
1877  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1878  ASSERT_EQ(thread_id, std::this_thread::get_id());
1879  serverSocket->removeAcceptCallback(&cb1, &eventBase);
1880  });
1881  cb1.setAcceptStoppedFn(
1882  [&]() { ASSERT_EQ(thread_id, std::this_thread::get_id()); });
1883 
1884  // Test having callbacks remove other callbacks before them on the list,
1885  serverSocket->addAcceptCallback(&cb1, &eventBase);
1886  serverSocket->startAccepting();
1887 
1888  // Make several connections to the socket
1889  std::shared_ptr<AsyncSocket> sock1(
1890  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1891 
1892  // Loop in another thread
1893  auto other = std::thread([&]() { eventBase.loop(); });
1894  other.join();
1895 
1896  // Check to make sure that the expected callbacks were invoked.
1897  //
1898  // NOTE: This code depends on the AsyncServerSocket operating calling all of
1899  // the AcceptCallbacks in round-robin fashion, in the order that they were
1900  // added. The code is implemented this way right now, but the API doesn't
1901  // explicitly require it be done this way. If we change the code not to be
1902  // exactly round robin in the future, we can simplify the test checks here.
1903  // (We'll also need to update the termination code, since we expect cb6 to
1904  // get called twice to terminate the loop.)
1905  ASSERT_EQ(cb1.getEvents()->size(), 3);
1906  ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1907  ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1908  ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1909 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::deque< EventInfo > * getEvents()
void setAcceptStartedFn(const std::function< void()> &fn)
socklen_t getAddress(sockaddr_storage *addr) const
void setAcceptStoppedFn(const std::function< void()> &fn)
TEST ( AsyncSocketTest  ,
DestroyCloseTest   
)

Definition at line 1952 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, TestServer::getAddress(), folly::test::msvcSuppressAbortOnInvalidParams(), folly::AsyncSocket::newSocket(), fizz::detail::read(), and folly::netops::socket().

1952  {
1953  TestServer server;
1954 
1955  // connect()
1956  EventBase clientEB;
1957  EventBase serverEB;
1958  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1959  ConnCallback ccb;
1960  socket->connect(&ccb, server.getAddress(), 30);
1961 
1962  // Accept the connection
1963  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1964  ReadCallback rcb;
1965  acceptedSocket->setReadCB(&rcb);
1966 
1967  // Write a large buffer to the socket that is larger than kernel buffer
1968  size_t simpleBufLength = 5000000;
1969  char* simpleBuf = new char[simpleBufLength];
1970  memset(simpleBuf, 'a', simpleBufLength);
1971  WriteCallback wcb;
1972 
1973  // Let the reads and writes run to completion
1974  int fd = acceptedSocket->getFd();
1975 
1976  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1977  socket.reset();
1978  acceptedSocket.reset();
1979 
1980  // Test that server socket was closed
1982  ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1983  ASSERT_EQ(sz, -1);
1984  ASSERT_EQ(errno, EBADF);
1985  });
1986  delete[] simpleBuf;
1987 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
auto msvcSuppressAbortOnInvalidParams(Func func) -> decltype(func())
Definition: TestUtil.h:166
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
TEST ( AsyncSocketTest  ,
ServerExistingSocket   
)

Test AsyncServerSocket::useExistingSocket()

Definition at line 1992 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_GE, folly::netops::bind(), folly::SocketAddress::getAddress(), folly::netops::listen(), serverSocketSanityTest(), folly::SocketAddress::setFromLocalAddress(), folly::SocketAddress::setPort(), and folly::netops::socket().

1992  {
1993  EventBase eventBase;
1994 
1995  // Test creating a socket, and letting AsyncServerSocket bind and listen
1996  {
1997  // Manually create a socket
1998  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1999  ASSERT_GE(fd, 0);
2000 
2001  // Create a server socket
2002  AsyncServerSocket::UniquePtr serverSocket(
2003  new AsyncServerSocket(&eventBase));
2004  serverSocket->useExistingSocket(fd);
2005  folly::SocketAddress address;
2006  serverSocket->getAddress(&address);
2007  address.setPort(0);
2008  serverSocket->bind(address);
2009  serverSocket->listen(16);
2010 
2011  // Make sure the socket works
2012  serverSocketSanityTest(serverSocket.get());
2013  }
2014 
2015  // Test creating a socket and binding manually,
2016  // then letting AsyncServerSocket listen
2017  {
2018  // Manually create a socket
2019  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2020  ASSERT_GE(fd, 0);
2021  // bind
2022  struct sockaddr_in addr;
2023  addr.sin_family = AF_INET;
2024  addr.sin_port = 0;
2025  addr.sin_addr.s_addr = INADDR_ANY;
2026  ASSERT_EQ(
2027  bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
2028  // Look up the address that we bound to
2029  folly::SocketAddress boundAddress;
2030  boundAddress.setFromLocalAddress(fd);
2031 
2032  // Create a server socket
2033  AsyncServerSocket::UniquePtr serverSocket(
2034  new AsyncServerSocket(&eventBase));
2035  serverSocket->useExistingSocket(fd);
2036  serverSocket->listen(16);
2037 
2038  // Make sure AsyncServerSocket reports the same address that we bound to
2039  folly::SocketAddress serverSocketAddress;
2040  serverSocket->getAddress(&serverSocketAddress);
2041  ASSERT_EQ(boundAddress, serverSocketAddress);
2042 
2043  // Make sure the socket works
2044  serverSocketSanityTest(serverSocket.get());
2045  }
2046 
2047  // Test creating a socket, binding and listening manually,
2048  // then giving it to AsyncServerSocket
2049  {
2050  // Manually create a socket
2051  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2052  ASSERT_GE(fd, 0);
2053  // bind
2054  struct sockaddr_in addr;
2055  addr.sin_family = AF_INET;
2056  addr.sin_port = 0;
2057  addr.sin_addr.s_addr = INADDR_ANY;
2058  ASSERT_EQ(
2059  bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
2060  // Look up the address that we bound to
2061  folly::SocketAddress boundAddress;
2062  boundAddress.setFromLocalAddress(fd);
2063  // listen
2064  ASSERT_EQ(listen(fd, 16), 0);
2065 
2066  // Create a server socket
2067  AsyncServerSocket::UniquePtr serverSocket(
2068  new AsyncServerSocket(&eventBase));
2069  serverSocket->useExistingSocket(fd);
2070 
2071  // Make sure AsyncServerSocket reports the same address that we bound to
2072  folly::SocketAddress serverSocketAddress;
2073  serverSocket->getAddress(&serverSocketAddress);
2074  ASSERT_EQ(boundAddress, serverSocketAddress);
2075 
2076  // Make sure the socket works
2077  serverSocketSanityTest(serverSocket.get());
2078  }
2079 }
#define ASSERT_GE(val1, val2)
Definition: gtest.h:1972
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
void setPort(uint16_t port)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
void setFromLocalAddress(int socket)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
void serverSocketSanityTest(AsyncServerSocket *serverSocket)
ThreadPoolListHook * addr
TEST ( AsyncSocketTest  ,
UnixDomainSocketTest   
)

Definition at line 2081 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::test::TestAcceptCallback::getEvents(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::Random::rand64(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::SocketAddress::setFromPath(), folly::netops::socket(), folly::test::TestAcceptCallback::TYPE_ACCEPT, folly::test::TestAcceptCallback::TYPE_START, and folly::test::TestAcceptCallback::TYPE_STOP.

2081  {
2082  EventBase eventBase;
2083 
2084  // Create a server socket
2085  std::shared_ptr<AsyncServerSocket> serverSocket(
2086  AsyncServerSocket::newSocket(&eventBase));
2087  string path(1, 0);
2088  path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2089  folly::SocketAddress serverAddress;
2090  serverAddress.setFromPath(path);
2091  serverSocket->bind(serverAddress);
2092  serverSocket->listen(16);
2093 
2094  // Add a callback to accept one connection then stop the loop
2095  TestAcceptCallback acceptCallback;
2096  acceptCallback.setConnectionAcceptedFn(
2097  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2098  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2099  });
2100  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2101  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2102  });
2103  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2104  serverSocket->startAccepting();
2105 
2106  // Connect to the server socket
2107  std::shared_ptr<AsyncSocket> socket(
2108  AsyncSocket::newSocket(&eventBase, serverAddress));
2109 
2110  eventBase.loop();
2111 
2112  // Verify that the server accepted a connection
2113  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2114  ASSERT_EQ(
2115  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
2116  ASSERT_EQ(
2117  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
2118  ASSERT_EQ(
2119  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
2120  int fd = acceptCallback.getEvents()->at(1).fd;
2121 
2122  // The accepted connection should already be in non-blocking mode
2123  int flags = fcntl(fd, F_GETFL, 0);
2124  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2125 }
void setFromPath(StringPiece path)
flags
Definition: http_parser.h:127
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::deque< EventInfo > * getEvents()
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
static uint64_t rand64()
Definition: Random.h:263
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
TEST ( AsyncSocketTest  ,
ConnectionEventCallbackDefault   
)

Definition at line 2127 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, folly::SocketAddress::getAddress(), folly::test::TestConnectionEventCallback::getBackoffEnded(), folly::test::TestConnectionEventCallback::getBackoffError(), folly::test::TestConnectionEventCallback::getBackoffStarted(), folly::test::TestConnectionEventCallback::getConnectionAccepted(), folly::test::TestConnectionEventCallback::getConnectionAcceptedError(), folly::test::TestConnectionEventCallback::getConnectionDequeuedByAcceptCallback(), folly::test::TestConnectionEventCallback::getConnectionDropped(), folly::test::TestConnectionEventCallback::getConnectionEnqueuedForAcceptCallback(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), and folly::netops::socket().

2127  {
2128  EventBase eventBase;
2129  TestConnectionEventCallback connectionEventCallback;
2130 
2131  // Create a server socket
2132  std::shared_ptr<AsyncServerSocket> serverSocket(
2133  AsyncServerSocket::newSocket(&eventBase));
2134  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2135  serverSocket->bind(0);
2136  serverSocket->listen(16);
2137  folly::SocketAddress serverAddress;
2138  serverSocket->getAddress(&serverAddress);
2139 
2140  // Add a callback to accept one connection then stop the loop
2141  TestAcceptCallback acceptCallback;
2142  acceptCallback.setConnectionAcceptedFn(
2143  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2144  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2145  });
2146  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2147  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2148  });
2149  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2150  serverSocket->startAccepting();
2151 
2152  // Connect to the server socket
2153  std::shared_ptr<AsyncSocket> socket(
2154  AsyncSocket::newSocket(&eventBase, serverAddress));
2155 
2156  eventBase.loop();
2157 
2158  // Validate the connection event counters
2159  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2160  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2161  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2162  ASSERT_EQ(
2163  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2164  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2165  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2166  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2167  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2168 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
unsigned int getConnectionAcceptedError() const
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
unsigned int getConnectionEnqueuedForAcceptCallback() const
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
unsigned int getConnectionDequeuedByAcceptCallback() const
TEST ( AsyncSocketTest  ,
CallbackInPrimaryEventBase   
)

Definition at line 2170 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_TRUE, folly::SocketAddress::getAddress(), folly::test::TestConnectionEventCallback::getBackoffEnded(), folly::test::TestConnectionEventCallback::getBackoffError(), folly::test::TestConnectionEventCallback::getBackoffStarted(), folly::test::TestConnectionEventCallback::getConnectionAccepted(), folly::test::TestConnectionEventCallback::getConnectionAcceptedError(), folly::test::TestConnectionEventCallback::getConnectionDequeuedByAcceptCallback(), folly::test::TestConnectionEventCallback::getConnectionDropped(), folly::test::TestConnectionEventCallback::getConnectionEnqueuedForAcceptCallback(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setAcceptStartedFn(), folly::test::TestAcceptCallback::setAcceptStoppedFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), and folly::netops::socket().

2170  {
2171  EventBase eventBase;
2172  TestConnectionEventCallback connectionEventCallback;
2173 
2174  // Create a server socket
2175  std::shared_ptr<AsyncServerSocket> serverSocket(
2176  AsyncServerSocket::newSocket(&eventBase));
2177  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2178  serverSocket->bind(0);
2179  serverSocket->listen(16);
2180  folly::SocketAddress serverAddress;
2181  serverSocket->getAddress(&serverAddress);
2182 
2183  // Add a callback to accept one connection then stop the loop
2184  TestAcceptCallback acceptCallback;
2185  acceptCallback.setConnectionAcceptedFn(
2186  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2187  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2188  });
2189  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2190  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2191  });
2192  bool acceptStartedFlag{false};
2193  acceptCallback.setAcceptStartedFn(
2194  [&acceptStartedFlag]() { acceptStartedFlag = true; });
2195  bool acceptStoppedFlag{false};
2196  acceptCallback.setAcceptStoppedFn(
2197  [&acceptStoppedFlag]() { acceptStoppedFlag = true; });
2198  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2199  serverSocket->startAccepting();
2200 
2201  // Connect to the server socket
2202  std::shared_ptr<AsyncSocket> socket(
2203  AsyncSocket::newSocket(&eventBase, serverAddress));
2204 
2205  eventBase.loop();
2206 
2207  ASSERT_TRUE(acceptStartedFlag);
2208  ASSERT_TRUE(acceptStoppedFlag);
2209  // Validate the connection event counters
2210  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2211  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2212  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2213  ASSERT_EQ(
2214  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2215  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2216  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2217  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2218  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2219 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
unsigned int getConnectionAcceptedError() const
void setAcceptStartedFn(const std::function< void()> &fn)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
unsigned int getConnectionEnqueuedForAcceptCallback() const
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void setAcceptStoppedFn(const std::function< void()> &fn)
unsigned int getConnectionDequeuedByAcceptCallback() const
TEST ( AsyncSocketTest  ,
CallbackInSecondaryEventBase   
)

Definition at line 2221 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_TRUE, folly::SocketAddress::getAddress(), folly::test::TestConnectionEventCallback::getBackoffEnded(), folly::test::TestConnectionEventCallback::getBackoffError(), folly::test::TestConnectionEventCallback::getBackoffStarted(), folly::test::TestConnectionEventCallback::getConnectionAccepted(), folly::test::TestConnectionEventCallback::getConnectionAcceptedError(), folly::test::TestConnectionEventCallback::getConnectionDequeuedByAcceptCallback(), folly::test::TestConnectionEventCallback::getConnectionDropped(), folly::test::TestConnectionEventCallback::getConnectionEnqueuedForAcceptCallback(), folly::ScopedEventBaseThread::getEventBase(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::Baton< MayBlock, Atom >::post(), folly::EventBase::runInEventBaseThread(), folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setAcceptStartedFn(), folly::test::TestAcceptCallback::setAcceptStoppedFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::netops::socket(), and folly::Baton< MayBlock, Atom >::try_wait_for().

2221  {
2222  EventBase eventBase;
2223  TestConnectionEventCallback connectionEventCallback;
2224 
2225  // Create a server socket
2226  std::shared_ptr<AsyncServerSocket> serverSocket(
2227  AsyncServerSocket::newSocket(&eventBase));
2228  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2229  serverSocket->bind(0);
2230  serverSocket->listen(16);
2231  SocketAddress serverAddress;
2232  serverSocket->getAddress(&serverAddress);
2233 
2234  // Add a callback to accept one connection then stop the loop
2235  TestAcceptCallback acceptCallback;
2236  ScopedEventBaseThread cobThread("ioworker_test");
2237  acceptCallback.setConnectionAcceptedFn(
2238  [&](int /* fd */, const SocketAddress& /* addr */) {
2239  eventBase.runInEventBaseThread([&] {
2240  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2241  });
2242  });
2243  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2244  eventBase.runInEventBaseThread(
2245  [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
2246  });
2247  std::atomic<bool> acceptStartedFlag{false};
2248  acceptCallback.setAcceptStartedFn([&]() { acceptStartedFlag = true; });
2249  Baton<> acceptStoppedFlag;
2250  acceptCallback.setAcceptStoppedFn([&]() { acceptStoppedFlag.post(); });
2251  serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
2252  serverSocket->startAccepting();
2253 
2254  // Connect to the server socket
2255  std::shared_ptr<AsyncSocket> socket(
2256  AsyncSocket::newSocket(&eventBase, serverAddress));
2257 
2258  eventBase.loop();
2259 
2260  ASSERT_TRUE(acceptStoppedFlag.try_wait_for(std::chrono::seconds(1)));
2261  ASSERT_TRUE(acceptStartedFlag);
2262  // Validate the connection event counters
2263  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2264  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2265  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2266  ASSERT_EQ(
2267  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2268  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2269  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2270  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2271  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2272 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
unsigned int getConnectionAcceptedError() const
FOLLY_ALWAYS_INLINE bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout, const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:206
void setAcceptStartedFn(const std::function< void()> &fn)
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
void post() noexcept
Definition: Baton.h:123
unsigned int getConnectionEnqueuedForAcceptCallback() const
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void setAcceptStoppedFn(const std::function< void()> &fn)
unsigned int getConnectionDequeuedByAcceptCallback() const
TEST ( AsyncSocketTest  ,
NumPendingMessagesInQueue   
)

Test AsyncServerSocket::getNumPendingMessagesInQueue()

Definition at line 2277 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, count, folly::SocketAddress::getAddress(), folly::ScopedEventBaseThread::getEventBase(), folly::EventBase::loop(), folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), folly::EventBase::runInEventBaseThread(), folly::EventBase::runInEventBaseThreadAndWait(), folly::test::TestAcceptCallback::setAcceptErrorFn(), and folly::test::TestAcceptCallback::setConnectionAcceptedFn().

2277  {
2278  EventBase eventBase;
2279 
2280  // Counter of how many connections have been accepted
2281  int count = 0;
2282 
2283  // Create a server socket
2284  auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2285  serverSocket->bind(0);
2286  serverSocket->listen(16);
2287  folly::SocketAddress serverAddress;
2288  serverSocket->getAddress(&serverAddress);
2289 
2290  // Add a callback to accept connections
2291  folly::ScopedEventBaseThread cobThread("ioworker_test");
2292  TestAcceptCallback acceptCallback;
2293  acceptCallback.setConnectionAcceptedFn(
2294  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2295  count++;
2296  eventBase.runInEventBaseThreadAndWait([&] {
2297  ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2298  });
2299  if (count == 4) {
2300  eventBase.runInEventBaseThread([&] {
2301  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2302  });
2303  }
2304  });
2305  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2306  eventBase.runInEventBaseThread(
2307  [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
2308  });
2309  serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
2310  serverSocket->startAccepting();
2311 
2312  // Connect to the server socket, 4 clients, there are 4 connections
2313  auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2314  auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2315  auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2316  auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2317 
2318  eventBase.loop();
2319  ASSERT_EQ(4, count);
2320 }
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
socklen_t getAddress(sockaddr_storage *addr) const
int * count
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
TEST ( AsyncSocketTest  ,
BufferTest   
)

Test AsyncTransport::BufferCallback

Definition at line 2325 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, TestServer::getAddress(), BufferCallback::hasBufferCleared(), BufferCallback::hasBuffered(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::NONE, option(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, and TestServer::verifyConnection().

2325  {
2326  TestServer server;
2327 
2328  EventBase evb;
2329  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2330  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2331  ConnCallback ccb;
2332  socket->connect(&ccb, server.getAddress(), 30, option);
2333 
2334  char buf[100 * 1024];
2335  memset(buf, 'c', sizeof(buf));
2336  WriteCallback wcb;
2337  BufferCallback bcb;
2338  socket->setBufferCallback(&bcb);
2339  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2340 
2341  evb.loop();
2344 
2345  ASSERT_TRUE(bcb.hasBuffered());
2347 
2348  socket->close();
2349  server.verifyConnection(buf, sizeof(buf));
2350 
2351  ASSERT_TRUE(socket->isClosedBySelf());
2352  ASSERT_FALSE(socket->isClosedByPeer());
2353 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
bool hasBuffered() const
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
StateEnum state
bool hasBufferCleared() const
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void verifyConnection(const char *buf, size_t len)
option(BUILD_SHARED_LIBS"Build shared libraries (DLLs)."OFF) option(gmock_build_tests"Build all of Google Mock's own tests."OFF) if(EXISTS"$
Definition: CMakeLists.txt:10
TEST ( AsyncSocketTest  ,
BufferCallbackKill   
)

Definition at line 2355 of file AsyncSocketTest2.cpp.

References testing::_, TestServer::accept(), addr, ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, folly::netops::connect(), folly::IOBuf::copyBuffer(), folly::IOBuf::create(), EXPECT_CALL, EXPECT_EQ, EXPECT_FALSE, EXPECT_LE, EXPECT_TRUE, TestServer::getAddress(), folly::SocketAddress::getAddress(), folly::INFO, testing::Invoke(), folly::SocketAddressTestHelper::isIPv4Enabled(), folly::SocketAddressTestHelper::isIPv6Enabled(), folly::SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4, folly::SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6, folly::EventBase::loop(), folly::EventBase::loopOnce(), MOCK_METHOD3, folly::AsyncSocket::newSocket(), folly::NONE, option(), fizz::detail::readBuf(), testing::SetErrnoAndReturn(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::ReadCallbackBase::state, folly::STATE_FAILED, folly::STATE_SUCCEEDED, folly::STATE_WAITING, WriteCallback::successCallback, folly::pushmi::detail::t, folly::test::TEST(), and fizz::detail::write().

2355  {
2356  TestServer server;
2357  EventBase evb;
2358  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2359  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2360  ConnCallback ccb;
2361  socket->connect(&ccb, server.getAddress(), 30, option);
2362  evb.loopOnce();
2363 
2364  char buf[100 * 1024];
2365  memset(buf, 'c', sizeof(buf));
2366  BufferCallback bcb;
2367  socket->setBufferCallback(&bcb);
2368  WriteCallback wcb;
2369  wcb.successCallback = [&] {
2370  ASSERT_TRUE(socket.unique());
2371  socket.reset();
2372  };
2373 
2374  // This will trigger AsyncSocket::handleWrite,
2375  // which calls WriteCallback::writeSuccess,
2376  // which calls wcb.successCallback above,
2377  // which tries to delete socket
2378  // Then, the socket will also try to use this BufferCallback
2379  // And that should crash us, if there is no DestructorGuard on the stack
2380  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2381 
2382  evb.loop();
2384 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
VoidCallback successCallback
StateEnum state
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
option(BUILD_SHARED_LIBS"Build shared libraries (DLLs)."OFF) option(gmock_build_tests"Build all of Google Mock's own tests."OFF) if(EXISTS"$
Definition: CMakeLists.txt:10
TEST ( AsyncSocketTest  ,
EvbCallbacks   
)

Definition at line 2900 of file AsyncSocketTest2.cpp.

References EXPECT_CALL, folly::gen::move, folly::AsyncSocket::newSocket(), folly::gen::seq(), and folly::netops::socket().

2900  {
2901  auto cb = std::make_unique<MockEvbChangeCallback>();
2902  EventBase evb;
2903  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2904 
2905  InSequence seq;
2906  EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
2907  EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
2908 
2909  socket->setEvbChangedCallback(std::move(cb));
2910  socket->detachEventBase();
2911  socket->attachEventBase(&evb);
2912 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Gen seq(Value first, Value last)
Definition: Base.h:484
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
#define EXPECT_CALL(obj, call)
TEST ( AsyncSocketTest  ,
TestEvbDetachWtRegisteredIOHandlers   
)

Definition at line 2914 of file AsyncSocketTest2.cpp.

References TestServer::accept(), folly::AsyncSocket::OptionKey::apply(), ASSERT_EQ, ASSERT_FALSE, ASSERT_GT, ASSERT_TRUE, EXPECT_CALL, EXPECT_EQ, EXPECT_LE, EXPECT_TRUE, TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), folly::EventBase::loopOnce(), folly::gen::move, folly::AsyncSocket::newSocket(), folly::pushmi::__adl::noexcept(), folly::gen::seq(), folly::netops::socket(), socket_, ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, folly::test::TEST(), uint32_t, and folly::AsyncSocketException::UNKNOWN.

2914  {
2915  // Start listening on a local port
2916  TestServer server;
2917 
2918  // Connect using a AsyncSocket
2919  EventBase evb;
2920  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2921  ConnCallback cb;
2922  socket->connect(&cb, server.getAddress(), 30);
2923 
2924  evb.loop();
2925 
2927  EXPECT_LE(0, socket->getConnectTime().count());
2928  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2929 
2930  // After the ioHandlers are registered, still should be able to detach/attach
2931  ReadCallback rcb;
2932  socket->setReadCB(&rcb);
2933 
2934  auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
2935  InSequence seq;
2936  EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
2937  EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
2938 
2939  socket->setEvbChangedCallback(std::move(cbEvbChg));
2940  EXPECT_TRUE(socket->isDetachable());
2941  socket->detachEventBase();
2942  socket->attachEventBase(&evb);
2943 
2944  socket->close();
2945 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Gen seq(Value first, Value last)
Definition: Base.h:484
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const folly::SocketAddress & getAddress() const
#define EXPECT_CALL(obj, call)
TEST ( AsyncSocket  ,
PreReceivedData   
)

Definition at line 3072 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), folly::IOBuf::copyBuffer(), TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), and folly::netops::socket().

3072  {
3073  TestServer server;
3074 
3075  EventBase evb;
3076  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3077  socket->connect(nullptr, server.getAddress(), 30);
3078  evb.loop();
3079 
3080  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3081 
3082  auto acceptedSocket = server.acceptAsync(&evb);
3083 
3084  ReadCallback peekCallback(2);
3085  ReadCallback readCallback;
3086  peekCallback.dataAvailableCallback = [&]() {
3087  peekCallback.verifyData("he", 2);
3088  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
3089  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
3090  acceptedSocket->setReadCB(nullptr);
3091  acceptedSocket->setReadCB(&readCallback);
3092  };
3093  readCallback.dataAvailableCallback = [&]() {
3094  if (readCallback.dataRead() == 5) {
3095  readCallback.verifyData("hello", 5);
3096  acceptedSocket->setReadCB(nullptr);
3097  }
3098  };
3099 
3100  acceptedSocket->setReadCB(&peekCallback);
3101 
3102  evb.loop();
3103 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::unique_ptr< IOBuf > copyBuffer(const folly::IOBuf &buf)
const folly::SocketAddress & getAddress() const
TEST ( AsyncSocket  ,
PreReceivedDataOnly   
)

Definition at line 3105 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), folly::IOBuf::copyBuffer(), TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), and folly::netops::socket().

3105  {
3106  TestServer server;
3107 
3108  EventBase evb;
3109  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3110  socket->connect(nullptr, server.getAddress(), 30);
3111  evb.loop();
3112 
3113  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3114 
3115  auto acceptedSocket = server.acceptAsync(&evb);
3116 
3117  ReadCallback peekCallback;
3118  ReadCallback readCallback;
3119  peekCallback.dataAvailableCallback = [&]() {
3120  peekCallback.verifyData("hello", 5);
3121  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3122  acceptedSocket->setReadCB(&readCallback);
3123  };
3124  readCallback.dataAvailableCallback = [&]() {
3125  readCallback.verifyData("hello", 5);
3126  acceptedSocket->setReadCB(nullptr);
3127  };
3128 
3129  acceptedSocket->setReadCB(&peekCallback);
3130 
3131  evb.loop();
3132 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::unique_ptr< IOBuf > copyBuffer(const folly::IOBuf &buf)
const folly::SocketAddress & getAddress() const
TEST ( AsyncSocket  ,
PreReceivedDataPartial   
)

Definition at line 3134 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), folly::IOBuf::copyBuffer(), TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), and folly::netops::socket().

3134  {
3135  TestServer server;
3136 
3137  EventBase evb;
3138  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3139  socket->connect(nullptr, server.getAddress(), 30);
3140  evb.loop();
3141 
3142  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3143 
3144  auto acceptedSocket = server.acceptAsync(&evb);
3145 
3146  ReadCallback peekCallback;
3147  ReadCallback smallReadCallback(3);
3148  ReadCallback normalReadCallback;
3149  peekCallback.dataAvailableCallback = [&]() {
3150  peekCallback.verifyData("hello", 5);
3151  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3152  acceptedSocket->setReadCB(&smallReadCallback);
3153  };
3154  smallReadCallback.dataAvailableCallback = [&]() {
3155  smallReadCallback.verifyData("hel", 3);
3156  acceptedSocket->setReadCB(&normalReadCallback);
3157  };
3158  normalReadCallback.dataAvailableCallback = [&]() {
3159  normalReadCallback.verifyData("lo", 2);
3160  acceptedSocket->setReadCB(nullptr);
3161  };
3162 
3163  acceptedSocket->setReadCB(&peekCallback);
3164 
3165  evb.loop();
3166 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::unique_ptr< IOBuf > copyBuffer(const folly::IOBuf &buf)
const folly::SocketAddress & getAddress() const
TEST ( AsyncSocket  ,
PreReceivedDataTakeover   
)

Definition at line 3168 of file AsyncSocketTest2.cpp.

References TestServer::accept(), TestServer::acceptFD(), ASSERT_EQ, ASSERT_FALSE, ASSERT_GE, ASSERT_NE, ASSERT_TRUE, folly::netops::close(), folly::IOBuf::copyBuffer(), EXPECT_EQ, EXPECT_TRUE, folly::test::TemporaryFile::fd(), TestServer::getAddress(), folly::SocketAddress::getAddress(), folly::test::TestAcceptCallback::getEvents(), folly::netops::getsockopt(), folly::EventBase::loop(), folly::gen::move, folly::AsyncServerSocket::newSocket(), folly::AsyncSocket::newSocket(), TestSendMsgParamsCallback::queriedData_, TestSendMsgParamsCallback::queriedFlags_, fizz::detail::read(), folly::netops::recvmsg(), TestSendMsgParamsCallback::reset(), SCOPE_EXIT, folly::test::TestAcceptCallback::setAcceptErrorFn(), folly::test::TestAcceptCallback::setConnectionAcceptedFn(), folly::netops::socket(), folly::netops::socketpair(), ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, string, folly::test::TEST(), uint32_t, folly::test::TemporaryFile::UNLINK_IMMEDIATELY, folly::value(), and fizz::detail::write().

3168  {
3169  TestServer server;
3170 
3171  EventBase evb;
3172  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3173  socket->connect(nullptr, server.getAddress(), 30);
3174  evb.loop();
3175 
3176  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3177 
3178  auto acceptedSocket =
3179  AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
3180  AsyncSocket::UniquePtr takeoverSocket;
3181 
3182  ReadCallback peekCallback(3);
3183  ReadCallback readCallback;
3184  peekCallback.dataAvailableCallback = [&]() {
3185  peekCallback.verifyData("hel", 3);
3186  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3187  acceptedSocket->setReadCB(nullptr);
3188  takeoverSocket =
3189  AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
3190  takeoverSocket->setReadCB(&readCallback);
3191  };
3192  readCallback.dataAvailableCallback = [&]() {
3193  readCallback.verifyData("hello", 5);
3194  takeoverSocket->setReadCB(nullptr);
3195  };
3196 
3197  acceptedSocket->setReadCB(&peekCallback);
3198 
3199  evb.loop();
3200 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
int acceptFD(int timeout=50)
std::unique_ptr< IOBuf > copyBuffer(const folly::IOBuf &buf)
const folly::SocketAddress & getAddress() const
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
TEST_P ( AsyncSocketConnectTest  ,
ConnectAndWrite   
)

Test writing immediately after connecting, without waiting for connect to finish.

Definition at line 209 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, ENABLED, EXPECT_EQ, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, and TestServer::verifyConnection().

209  {
210  TestServer server;
211 
212  // connect()
213  EventBase evb;
214  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
215 
216  if (GetParam() == TFOState::ENABLED) {
217  socket->enableTFO();
218  }
219 
220  ConnCallback ccb;
221  socket->connect(&ccb, server.getAddress(), 30);
222 
223  // write()
224  char buf[128];
225  memset(buf, 'a', sizeof(buf));
226  WriteCallback wcb;
227  socket->write(&wcb, buf, sizeof(buf));
228 
229  // Loop. We don't bother accepting on the server socket yet.
230  // The kernel should be able to buffer the write request so it can succeed.
231  evb.loop();
232 
235 
236  // Make sure the server got a connection and received the data
237  socket->close();
238  server.verifyConnection(buf, sizeof(buf));
239 
240  ASSERT_TRUE(socket->isClosedBySelf());
241  ASSERT_FALSE(socket->isClosedByPeer());
242  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
243 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void verifyConnection(const char *buf, size_t len)
TEST_P ( AsyncSocketConnectTest  ,
ConnectNullCallback   
)

Test connecting using a nullptr connect callback.

Definition at line 248 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, ENABLED, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), WriteCallback::state, folly::STATE_SUCCEEDED, and TestServer::verifyConnection().

248  {
249  TestServer server;
250 
251  // connect()
252  EventBase evb;
253  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
254  if (GetParam() == TFOState::ENABLED) {
255  socket->enableTFO();
256  }
257 
258  socket->connect(nullptr, server.getAddress(), 30);
259 
260  // write some data, just so we have some way of verifing
261  // that the socket works correctly after connecting
262  char buf[128];
263  memset(buf, 'a', sizeof(buf));
264  WriteCallback wcb;
265  socket->write(&wcb, buf, sizeof(buf));
266 
267  evb.loop();
268 
270 
271  // Make sure the server got a connection and received the data
272  socket->close();
273  server.verifyConnection(buf, sizeof(buf));
274 
275  ASSERT_TRUE(socket->isClosedBySelf());
276  ASSERT_FALSE(socket->isClosedByPeer());
277 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void verifyConnection(const char *buf, size_t len)
TEST_P ( AsyncSocketConnectTest  ,
ConnectWriteAndClose   
)

Test calling both write() and close() immediately after connecting, without waiting for connect to finish.

This exercises the STATE_CONNECTING_CLOSING code.

Definition at line 285 of file AsyncSocketTest2.cpp.

References ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, ENABLED, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, and TestServer::verifyConnection().

285  {
286  TestServer server;
287 
288  // connect()
289  EventBase evb;
290  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
291  if (GetParam() == TFOState::ENABLED) {
292  socket->enableTFO();
293  }
294  ConnCallback ccb;
295  socket->connect(&ccb, server.getAddress(), 30);
296 
297  // write()
298  char buf[128];
299  memset(buf, 'a', sizeof(buf));
300  WriteCallback wcb;
301  socket->write(&wcb, buf, sizeof(buf));
302 
303  // close()
304  socket->close();
305 
306  // Loop. We don't bother accepting on the server socket yet.
307  // The kernel should be able to buffer the write request so it can succeed.
308  evb.loop();
309 
312 
313  // Make sure the server got a connection and received the data
314  server.verifyConnection(buf, sizeof(buf));
315 
316  ASSERT_TRUE(socket->isClosedBySelf());
317  ASSERT_FALSE(socket->isClosedByPeer());
318 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void verifyConnection(const char *buf, size_t len)
TEST_P ( AsyncSocketConnectTest  ,
ConnectAndRead   
)

Test installing a read callback immediately, before connect() finishes.

Definition at line 431 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, folly::ReadCallback::buffers, folly::IOBuf::copyBuffer(), ENABLED, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::netops::socket(), ConnCallback::state, folly::STATE_SUCCEEDED, and uint8_t.

431  {
432  TestServer server;
433 
434  // connect()
435  EventBase evb;
436  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
437  if (GetParam() == TFOState::ENABLED) {
438  socket->enableTFO();
439  }
440 
441  ConnCallback ccb;
442  socket->connect(&ccb, server.getAddress(), 30);
443 
444  ReadCallback rcb;
445  socket->setReadCB(&rcb);
446 
447  if (GetParam() == TFOState::ENABLED) {
448  // Trigger a connection
449  socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
450  }
451 
452  // Even though we haven't looped yet, we should be able to accept
453  // the connection and send data to it.
454  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
455  uint8_t buf[128];
456  memset(buf, 'a', sizeof(buf));
457  acceptedSocket->write(buf, sizeof(buf));
458  acceptedSocket->flush();
459  acceptedSocket->close();
460 
461  // Loop, although there shouldn't be anything to do.
462  evb.loop();
463 
465  ASSERT_EQ(rcb.buffers.size(), 1);
466  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
467  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
468 
469  ASSERT_FALSE(socket->isClosedBySelf());
470  ASSERT_FALSE(socket->isClosedByPeer());
471 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
std::vector< Buffer > buffers
StateEnum state
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
std::unique_ptr< IOBuf > copyBuffer(const folly::IOBuf &buf)
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
TEST_P ( AsyncSocketConnectTest  ,
ConnectWriteAndRead   
)

Test both writing and installing a read callback immediately, before connect() finishes.

Definition at line 515 of file AsyncSocketTest2.cpp.

References TestServer::accept(), ASSERT_EQ, ASSERT_FALSE, ASSERT_TRUE, folly::ReadCallback::buffers, ENABLED, TestServer::getAddress(), folly::EventBase::loop(), folly::AsyncSocket::newSocket(), folly::shutdown(), folly::netops::socket(), ConnCallback::state, folly::ReadCallbackBase::state, folly::STATE_SUCCEEDED, uint32_t, and uint8_t.

515  {
516  TestServer server;
517 
518  // connect()
519  EventBase evb;
520  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
521  if (GetParam() == TFOState::ENABLED) {
522  socket->enableTFO();
523  }
524  ConnCallback ccb;
525  socket->connect(&ccb, server.getAddress(), 30);
526 
527  // write()
528  char buf1[128];
529  memset(buf1, 'a', sizeof(buf1));
530  WriteCallback wcb;
531  socket->write(&wcb, buf1, sizeof(buf1));
532 
533  // set a read callback
534  ReadCallback rcb;
535  socket->setReadCB(&rcb);
536 
537  // Even though we haven't looped yet, we should be able to accept
538  // the connection and send data to it.
539  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
540  uint8_t buf2[128];
541  memset(buf2, 'b', sizeof(buf2));
542  acceptedSocket->write(buf2, sizeof(buf2));
543  acceptedSocket->flush();
544 
545  // shut down the write half of acceptedSocket, so that the AsyncSocket
546  // will stop reading and we can break out of the event loop.
547  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
548 
549  // Loop
550  evb.loop();
551 
552  // Make sure the connect succeeded
554 
555  // Make sure the AsyncSocket read the data written by the accepted socket
557  ASSERT_EQ(rcb.buffers.size(), 1);
558  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
559  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
560 
561  // Close the AsyncSocket so we'll see EOF on acceptedSocket
562  socket->close();
563 
564  // Make sure the accepted socket saw the data written by the AsyncSocket
565  uint8_t readbuf[sizeof(buf1)];
566  acceptedSocket->readAll(readbuf, sizeof(readbuf));
567  ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
568  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
569  ASSERT_EQ(bytesRead, 0);
570 
571  ASSERT_FALSE(socket->isClosedBySelf());
572  ASSERT_TRUE(socket->isClosedByPeer());
573 }
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::shared_ptr< BlockingSocket > accept(int timeout=50)
std::vector< Buffer > buffers
StateEnum state
void shutdown(Counter &)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
const folly::SocketAddress & getAddress() const
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void testConnectOptWrite ( size_t  size1,
size_t  size2,
bool  close = false 
)

Test connect+write, then have the connect callback perform another write.

This tests interaction of the optimistic writing after connect with additional write attempts that occur in the connect callback.

Definition at line 858 of file AsyncSocketTest2.cpp.

References TestServer::acceptAsync(), ASSERT_EQ, folly::netops::bind(), folly::ReadCallback::buffers, folly::netops::close(), folly::test::end(), TestServer::getAddress(), folly::INFO, folly::EventBase::loop(), min, folly::AsyncSocket::newSocket(), folly::AsyncSocket::setReadCB(), folly::netops::socket(), start, ConnCallback::state, WriteCallback::state, folly::STATE_SUCCEEDED, ConnCallback::successCallback, WriteCallback::successCallback, and tmpDisableReads().

Referenced by TEST().

858  {
859  TestServer server;
860  EventBase evb;
861  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
862 
863  // connect()
864  ConnCallback ccb;
865  socket->connect(&ccb, server.getAddress(), 30);
866 
867  // Hopefully the connect didn't succeed immediately.
868  // If it did, we can't exercise the optimistic write code path.
869  if (ccb.state == STATE_SUCCEEDED) {
870  LOG(INFO) << "connect() succeeded immediately; aborting test "
871  "of optimistic write behavior";
872  return;
873  }
874 
875  // Tell the connect callback to perform a write when the connect succeeds
876  WriteCallback wcb2;
877  std::unique_ptr<char[]> buf2(new char[size2]);
878  memset(buf2.get(), 'b', size2);
879  if (size2 > 0) {
880  ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
881  // Tell the second write callback to close the connection when it is done
882  wcb2.successCallback = [&] { socket->closeNow(); };
883  }
884 
885  // Schedule one write() immediately, before the connect finishes
886  std::unique_ptr<char[]> buf1(new char[size1]);
887  memset(buf1.get(), 'a', size1);
888  WriteCallback wcb1;
889  if (size1 > 0) {
890  socket->write(&wcb1, buf1.get(), size1);
891  }
892 
893  if (close) {
894  // immediately perform a close, before connect() completes
895  socket->close();
896  }
897 
898  // Start reading from the other endpoint after 10ms.
899  // If we're using large buffers, we have to read so that the writes don't
900  // block forever.
901  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
902  ReadCallback rcb;
903  rcb.dataAvailableCallback =
904  std::bind(tmpDisableReads, acceptedSocket.get(), &rcb);
905  socket->getEventBase()->tryRunAfterDelay(
906  std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb), 10);
907 
908  // Loop. We don't bother accepting on the server socket yet.
909  // The kernel should be able to buffer the write request so it can succeed.
910  evb.loop();
911 
913  if (size1 > 0) {
914  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
915  }
916  if (size2 > 0) {
918  }
919 
920  socket->close();
921 
922  // Make sure the read callback received all of the data
923  size_t bytesRead = 0;
924  for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
925  it != rcb.buffers.end();
926  ++it) {
927  size_t start = bytesRead;
928  bytesRead += it->length;
929  size_t end = bytesRead;
930  if (start < size1) {
931  size_t cmpLen = min(size1, end) - start;
932  ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
933  }
934  if (end > size1 && end <= size1 + size2) {
935  size_t itOffset;
936  size_t buf2Offset;
937  size_t cmpLen;
938  if (start >= size1) {
939  itOffset = 0;
940  buf2Offset = start - size1;
941  cmpLen = end - start;
942  } else {
943  itOffset = size1 - start;
944  buf2Offset = 0;
945  cmpLen = end - size1;
946  }
947  ASSERT_EQ(
948  memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, cmpLen), 0);
949  }
950  }
951  ASSERT_EQ(bytesRead, size1 + size2);
952 }
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::vector< Buffer > buffers
VoidCallback successCallback
VoidCallback successCallback
LogLevel min
Definition: LogLevel.cpp:30
StateEnum state
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
StateEnum state
void tmpDisableReads(AsyncSocket *socket, ReadCallback *rcb)
auto start
const folly::SocketAddress & getAddress() const
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
int close(NetworkSocket s)
Definition: NetOps.cpp:90
void tmpDisableReads ( AsyncSocket socket,
ReadCallback rcb 
)

Definition at line 844 of file AsyncSocketTest2.cpp.

References folly::netops::bind(), folly::AsyncSocket::getEventBase(), folly::EventBase::runInLoop(), and folly::AsyncSocket::setReadCB().

Referenced by testConnectOptWrite().

844  {
845  // Uninstall the read callback
846  socket->setReadCB(nullptr);
847  // Schedule the read callback to be reinstalled after 1ms
848  socket->getEventBase()->runInLoop(
849  std::bind(&AsyncSocket::setReadCB, socket, rcb));
850 }
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
void setReadCB(ReadCallback *callback) override
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
EventBase * getEventBase() const override
Definition: AsyncSocket.h:328