39 #include <sys/types.h> 48 using std::unique_ptr;
50 using std::chrono::milliseconds;
52 using namespace folly;
61 const std::shared_ptr<AsyncSocket>&
socket,
62 unique_ptr<IOBuf>&& bufs,
65 bool lastWrite =
false)
71 lastWrite_(lastWrite) {}
96 TEST(AsyncSocketTest, Connect) {
105 socket->connect(&cb, server.
getAddress(), 30);
111 EXPECT_LE(0, socket->getConnectTime().count());
112 EXPECT_GE(socket->getConnectStartTime(), startedAt);
113 EXPECT_LE(socket->getConnectStartTime(), socket->getConnectEndTime());
114 EXPECT_LE(socket->getConnectEndTime(), finishedAt);
115 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
126 std::vector<TFOState> vals;
143 TEST(AsyncSocketTest, ConnectRefused) {
151 socket->connect(&cb, addr, 30);
157 EXPECT_LE(0, socket->getConnectTime().count());
158 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
164 TEST(AsyncSocketTest, ConnectTimeout) {
182 socket->connect(&cb, addr, 1);
191 SKIP() <<
"do not have a routable but unreachable IP address";
199 socket->getPeerAddress(&peer);
201 EXPECT_LE(0, socket->getConnectTime().count());
202 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
221 socket->connect(&ccb, server.
getAddress(), 30);
225 memset(buf,
'a',
sizeof(buf));
227 socket->write(&wcb, buf,
sizeof(buf));
242 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
258 socket->connect(
nullptr, server.
getAddress(), 30);
263 memset(buf,
'a',
sizeof(buf));
265 socket->write(&wcb, buf,
sizeof(buf));
295 socket->connect(&ccb, server.
getAddress(), 30);
299 memset(buf,
'a',
sizeof(buf));
301 socket->write(&wcb, buf,
sizeof(buf));
323 TEST(AsyncSocketTest, ConnectAndClose) {
330 socket->connect(&ccb, server.
getAddress(), 30);
335 LOG(
INFO) <<
"connect() succeeded immediately; aborting test " 336 "of close-during-connect behavior";
357 TEST(AsyncSocketTest, ConnectAndCloseNow) {
364 socket->connect(&ccb, server.
getAddress(), 30);
369 LOG(
INFO) <<
"connect() succeeded immediately; aborting test " 370 "of closeNow()-during-connect behavior";
392 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
399 socket->connect(&ccb, server.
getAddress(), 30);
404 LOG(
INFO) <<
"connect() succeeded immediately; aborting test " 405 "of write-during-connect behavior";
411 memset(buf,
'a',
sizeof(buf));
413 socket->write(&wcb, buf,
sizeof(buf));
442 socket->connect(&ccb, server.
getAddress(), 30);
445 socket->setReadCB(&rcb);
454 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
456 memset(buf,
'a',
sizeof(buf));
457 acceptedSocket->write(buf,
sizeof(buf));
458 acceptedSocket->flush();
459 acceptedSocket->close();
477 TEST(AsyncSocketTest, ConnectReadAndClose) {
484 socket->connect(&ccb, server.
getAddress(), 30);
489 LOG(
INFO) <<
"connect() succeeded immediately; aborting test " 490 "of read-during-connect behavior";
495 socket->setReadCB(&rcb);
525 socket->connect(&ccb, server.
getAddress(), 30);
529 memset(buf1,
'a',
sizeof(buf1));
531 socket->write(&wcb, buf1,
sizeof(buf1));
535 socket->setReadCB(&rcb);
539 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
541 memset(buf2,
'b',
sizeof(buf2));
542 acceptedSocket->write(buf2,
sizeof(buf2));
543 acceptedSocket->flush();
547 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
566 acceptedSocket->readAll(readbuf,
sizeof(readbuf));
567 ASSERT_EQ(memcmp(buf1, readbuf,
sizeof(buf1)), 0);
568 uint32_t bytesRead = acceptedSocket->read(readbuf,
sizeof(readbuf));
579 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
586 socket->connect(&ccb, server.
getAddress(), 30);
591 LOG(
INFO) <<
"connect() succeeded immediately; skipping test";
597 memset(wbuf,
'a',
sizeof(wbuf));
599 socket->write(&wcb, wbuf,
sizeof(wbuf));
600 socket->shutdownWrite();
603 socket->shutdownWrite();
607 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
611 struct pollfd fds[1];
612 fds[0].fd = acceptedSocket->getSocketFD();
613 fds[0].events = POLLIN;
615 int rc =
poll(fds, 1, 0);
620 memset(acceptedWbuf,
'b',
sizeof(acceptedWbuf));
621 acceptedSocket->write(acceptedWbuf,
sizeof(acceptedWbuf));
622 acceptedSocket->flush();
638 acceptedSocket->readAll(readbuf,
sizeof(readbuf));
639 ASSERT_EQ(memcmp(wbuf, readbuf,
sizeof(wbuf)), 0);
640 uint32_t bytesRead = acceptedSocket->read(readbuf,
sizeof(readbuf));
645 acceptedSocket->close();
649 socket->setReadCB(&rcb);
657 memcmp(rcb.
buffers[0].buffer, acceptedWbuf,
sizeof(acceptedWbuf)), 0);
667 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
674 socket->connect(&ccb, server.
getAddress(), 30);
679 LOG(
INFO) <<
"connect() succeeded immediately; skipping test";
685 socket->setReadCB(&rcb);
689 memset(wbuf,
'a',
sizeof(wbuf));
691 socket->write(&wcb, wbuf,
sizeof(wbuf));
694 socket->shutdownWrite();
698 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
702 struct pollfd fds[1];
703 fds[0].fd = acceptedSocket->getSocketFD();
704 fds[0].events = POLLIN;
706 int rc =
poll(fds, 1, 0);
711 memset(acceptedWbuf,
'b',
sizeof(acceptedWbuf));
712 acceptedSocket->write(acceptedWbuf,
sizeof(acceptedWbuf));
713 acceptedSocket->flush();
716 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
732 memcmp(rcb.
buffers[0].buffer, acceptedWbuf,
sizeof(acceptedWbuf)), 0);
738 acceptedSocket->readAll(readbuf,
sizeof(readbuf));
739 ASSERT_EQ(memcmp(wbuf, readbuf,
sizeof(wbuf)), 0);
740 uint32_t bytesRead = acceptedSocket->read(readbuf,
sizeof(readbuf));
744 acceptedSocket->close();
755 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
762 socket->connect(&ccb, server.
getAddress(), 30);
767 LOG(
INFO) <<
"connect() succeeded immediately; skipping test";
773 socket->setReadCB(&rcb);
777 memset(wbuf,
'a',
sizeof(wbuf));
779 socket->write(&wcb, wbuf,
sizeof(wbuf));
783 socket->shutdownWriteNow();
791 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
795 struct pollfd fds[1];
796 fds[0].fd = acceptedSocket->getSocketFD();
797 fds[0].events = POLLIN;
799 int rc =
poll(fds, 1, 0);
804 memset(acceptedWbuf,
'b',
sizeof(acceptedWbuf));
805 acceptedSocket->write(acceptedWbuf,
sizeof(acceptedWbuf));
806 acceptedSocket->flush();
809 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
825 memcmp(rcb.
buffers[0].buffer, acceptedWbuf,
sizeof(acceptedWbuf)), 0);
831 uint32_t bytesRead = acceptedSocket->read(readbuf,
sizeof(readbuf));
835 acceptedSocket->close();
865 socket->connect(&ccb, server.
getAddress(), 30);
870 LOG(
INFO) <<
"connect() succeeded immediately; aborting test " 871 "of optimistic write behavior";
877 std::unique_ptr<char[]> buf2(
new char[size2]);
878 memset(buf2.get(),
'b', size2);
880 ccb.
successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
886 std::unique_ptr<char[]> buf1(
new char[size1]);
887 memset(buf1.get(),
'a', size1);
890 socket->write(&wcb1, buf1.get(), size1);
901 std::shared_ptr<AsyncSocket> acceptedSocket = server.
acceptAsync(&evb);
903 rcb.dataAvailableCallback =
905 socket->getEventBase()->tryRunAfterDelay(
923 size_t bytesRead = 0;
924 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.
buffers.begin();
927 size_t start = bytesRead;
928 bytesRead += it->length;
929 size_t end = bytesRead;
931 size_t cmpLen =
min(size1, end) -
start;
934 if (end > size1 && end <= size1 + size2) {
938 if (start >= size1) {
940 buf2Offset = start - size1;
941 cmpLen = end -
start;
943 itOffset = size1 -
start;
945 cmpLen = end - size1;
948 memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, cmpLen), 0);
954 TEST(AsyncSocketTest, ConnectCallbackWrite) {
959 const size_t largeSize = 32 * 1024 * 1024;
992 TEST(AsyncSocketTest, WriteNullCallback) {
997 std::shared_ptr<AsyncSocket>
socket =
1003 memset(buf,
'a',
sizeof(buf));
1004 socket->write(
nullptr, buf,
sizeof(buf));
1019 TEST(AsyncSocketTest, WriteTimeout) {
1024 std::shared_ptr<AsyncSocket>
socket =
1033 size_t writeLength = 32 * 1024 * 1024;
1035 socket->setSendTimeout(timeout);
1036 std::unique_ptr<char[]> buf(
new char[writeLength]);
1037 memset(buf.get(),
'a', writeLength);
1039 socket->write(&wcb, buf.get(), writeLength);
1069 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1075 TEST(AsyncSocketTest, WritePipeError) {
1080 std::shared_ptr<AsyncSocket>
socket =
1082 socket->setSendTimeout(1000);
1086 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
1087 acceptedSocket->close();
1090 size_t writeLength = 32 * 1024 * 1024;
1091 std::unique_ptr<char[]> buf(
new char[writeLength]);
1092 memset(buf.get(),
'a', writeLength);
1094 socket->write(&wcb, buf.get(), writeLength);
1111 TEST(AsyncSocketTest, WriteAfterReadEOF) {
1116 std::shared_ptr<AsyncSocket>
socket =
1121 std::shared_ptr<AsyncSocket> acceptedSocket = server.
acceptAsync(&evb);
1123 acceptedSocket->setReadCB(&rcb);
1126 socket->shutdownWrite();
1134 constexpr
size_t simpleBufLength = 5;
1135 char simpleBuf[simpleBufLength];
1136 memset(simpleBuf,
'a', simpleBufLength);
1138 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1149 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1153 constexpr
size_t kSockBufSize = 8 * 1024;
1154 constexpr
size_t kEffectiveSockBufSize = 2 * kSockBufSize;
1159 {{SOL_SOCKET, SO_SNDBUF}, int(kSockBufSize)},
1160 {{SOL_SOCKET, SO_RCVBUF}, int(kSockBufSize)},
1161 {{IPPROTO_TCP, TCP_NODELAY}, 1},
1167 std::thread senderThread([&]() { senderEvb.
loopForever(); });
1170 std::shared_ptr<AsyncSocket>
socket;
1174 socket->connect(&ccb, server.
getAddress(), 30, options);
1178 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
1181 constexpr
size_t kSendSize = 100 * 1024;
1182 auto const sendBuf = std::vector<char>(kSendSize,
'a');
1187 [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
1191 constexpr
size_t kRecvSize = 20 * 1024;
1193 auto bytesRead = acceptedSocket->readAll(recvBuf,
sizeof(recvBuf));
1195 EXPECT_EQ(0, memcmp(recvBuf, sendBuf.data(), bytesRead));
1199 constexpr
size_t kMinExpectedBytesWritten = kRecvSize + kSockBufSize;
1204 constexpr
size_t kMaxExpectedBytesWritten =
1205 kRecvSize + kEffectiveSockBufSize + kEffectiveSockBufSize;
1207 kMaxExpectedBytesWritten < kSendSize,
"kSendSize set too small");
1211 using clock = std::chrono::steady_clock;
1212 auto const deadline =
clock::now() + std::chrono::seconds(2);
1213 while (wcb.bytesWritten < kMinExpectedBytesWritten &&
1217 acceptedSocket->closeWithReset();
1220 senderThread.join();
1223 ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
1224 ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
1230 TEST(AsyncSocketTest, WriteIOBuf) {
1237 socket->connect(&ccb, server.
getAddress(), 30);
1240 std::shared_ptr<AsyncSocket> acceptedSocket = server.
acceptAsync(&evb);
1242 acceptedSocket->setReadCB(&rcb);
1246 socket->setEorTracking(
true);
1248 socket->setEorTracking(
false);
1252 constexpr
size_t simpleBufLength = 5;
1253 char simpleBuf[simpleBufLength];
1254 memset(simpleBuf,
'a', simpleBufLength);
1256 socket->write(&wcb, simpleBuf, simpleBufLength);
1259 size_t buf1Length = 7;
1262 buf1->
append(buf1Length);
1263 unique_ptr<IOBuf> buf1Copy(buf1->
clone());
1265 socket->writeChain(&wcb2,
std::move(buf1));
1268 size_t buf2Length = 11;
1271 buf2->
append(buf2Length);
1272 size_t buf3Length = 13;
1275 buf3->
append(buf3Length);
1277 unique_ptr<IOBuf> buf2Copy(buf2->
clone());
1278 buf2Copy->coalesce();
1280 socket->writeChain(&wcb3,
std::move(buf2));
1281 socket->shutdownWrite();
1295 simpleBufLength + buf1Length + buf2Length + buf3Length);
1299 rcb.
buffers[0].buffer + simpleBufLength,
1301 buf1Copy->length()),
1305 rcb.
buffers[0].buffer + simpleBufLength + buf1Length,
1307 buf2Copy->length()),
1310 acceptedSocket->close();
1317 TEST(AsyncSocketTest, WriteIOBufCorked) {
1324 socket->connect(&ccb, server.
getAddress(), 30);
1327 std::shared_ptr<AsyncSocket> acceptedSocket = server.
acceptAsync(&evb);
1329 acceptedSocket->setReadCB(&rcb);
1335 size_t buf1Length = 5;
1338 buf1->
append(buf1Length);
1339 size_t buf2Length = 7;
1342 buf2->
append(buf2Length);
1343 size_t buf3Length = 11;
1346 buf3->
append(buf3Length);
1348 socket->writeChain(&wcb1,
std::move(buf1));
1371 acceptedSocket->close();
1381 TEST(AsyncSocketTest, ZeroLengthWrite) {
1386 std::shared_ptr<AsyncSocket>
socket =
1392 acceptedSocket->setReadCB(&rcb);
1394 size_t len1 = 1024 * 1024;
1395 size_t len2 = 1024 * 1024;
1396 std::unique_ptr<char[]> buf(
new char[len1 + len2]);
1397 memset(buf.get(),
'a', len1);
1398 memset(buf.get(),
'b', len2);
1404 socket->write(&wcb1, buf.get(), 0);
1405 socket->write(&wcb2, buf.get(), len1);
1406 socket->write(&wcb3, buf.get() + len1, 0);
1407 socket->write(&wcb4, buf.get() + len1, len2);
1416 rcb.verifyData(buf.get(), len1 + len2);
1422 TEST(AsyncSocketTest, ZeroLengthWritev) {
1427 std::shared_ptr<AsyncSocket>
socket =
1433 acceptedSocket->setReadCB(&rcb);
1435 size_t len1 = 1024 * 1024;
1436 size_t len2 = 1024 * 1024;
1437 std::unique_ptr<char[]> buf(
new char[len1 + len2]);
1438 memset(buf.get(),
'a', len1);
1439 memset(buf.get(),
'b', len2);
1442 constexpr
size_t iovCount = 4;
1443 struct iovec iov[iovCount];
1444 iov[0].iov_base = buf.get();
1445 iov[0].iov_len = len1;
1446 iov[1].iov_base = buf.get() + len1;
1448 iov[2].iov_base = buf.get() + len1;
1449 iov[2].iov_len = len2;
1450 iov[3].iov_base = buf.get() + len1 + len2;
1453 socket->writev(&wcb, iov, iovCount);
1458 rcb.verifyData(buf.get(), len1 + len2);
1471 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1478 socket->connect(&ccb, server.
getAddress(), 30);
1481 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
1491 memset(buf,
'a',
sizeof(buf));
1492 typedef vector<std::shared_ptr<WriteCallback>> WriteCallbackVector;
1493 WriteCallbackVector writeCallbacks;
1495 writeCallbacks.reserve(5);
1496 while (writeCallbacks.size() < 5) {
1499 socket->write(wcb.get(), buf,
sizeof(buf));
1508 writeCallbacks.push_back(wcb);
1515 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1516 it != writeCallbacks.end();
1532 bool immediateReadCalled =
false;
1537 immediateReadCalled =
true;
1545 const size_t maxBufferSz = 100;
1546 const size_t maxReadsPerEvent = 1;
1547 const size_t expectedDataSz = maxBufferSz * 3;
1548 char expectedData[expectedDataSz];
1549 memset(expectedData,
'j', expectedDataSz);
1566 rcbServer.dataAvailableCallback = [&]() {
1567 if (rcbServer.dataRead() == expectedDataSz) {
1569 rcbServer.verifyData(expectedData, expectedDataSz);
1570 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1571 acceptedSocket->close();
1574 acceptedSocket->setReadCB(&rcbServer);
1578 socket.
write(&wcb1, expectedData, expectedDataSz);
1581 rcb.verifyData(expectedData, expectedDataSz);
1591 const size_t maxBufferSz = 100;
1592 const size_t maxReadsPerEvent = 1;
1593 const size_t expectedDataSz = maxBufferSz * 3;
1594 char expectedData[expectedDataSz];
1595 memset(expectedData,
'k', expectedDataSz);
1612 rcbServer.dataAvailableCallback = [&]() {
1613 if (rcbServer.dataRead() == expectedDataSz) {
1615 rcbServer.verifyData(expectedData, expectedDataSz);
1616 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1617 acceptedSocket->close();
1620 acceptedSocket->setReadCB(&rcbServer);
1622 rcb.dataAvailableCallback = [&]() {
1629 socket.
write(&wcb, expectedData, expectedDataSz);
1661 TEST(AsyncSocketTest, ServerAcceptOptions) {
1665 std::shared_ptr<AsyncServerSocket> serverSocket(
1667 serverSocket->bind(0);
1668 serverSocket->listen(16);
1676 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1679 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1681 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1682 serverSocket->startAccepting();
1685 std::shared_ptr<AsyncSocket>
socket(
1698 int fd = acceptCallback.
getEvents()->at(1).fd;
1701 int flags = fcntl(fd, F_GETFL, 0);
1702 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1707 socklen_t valueLength =
sizeof(
value);
1708 int rc =
getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1717 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1720 std::shared_ptr<AsyncServerSocket> serverSocket(
1722 serverSocket->bind(0);
1723 serverSocket->listen(16);
1745 &eventBase, serverAddress));
1751 std::shared_ptr<AsyncSocket> sock3(
1756 std::shared_ptr<AsyncSocket> sock5(
1761 if (cb2Count == 0) {
1762 serverSocket->removeAcceptCallback(&cb3,
nullptr);
1763 serverSocket->removeAcceptCallback(&cb5,
nullptr);
1772 if (cb6Count == 0) {
1773 serverSocket->removeAcceptCallback(&cb4,
nullptr);
1774 std::shared_ptr<AsyncSocket> sock6(
1776 std::shared_ptr<AsyncSocket> sock7(
1778 std::shared_ptr<AsyncSocket> sock8(
1782 serverSocket.reset();
1789 serverSocket->removeAcceptCallback(&cb7,
nullptr);
1792 serverSocket->addAcceptCallback(&cb1, &eventBase);
1793 serverSocket->addAcceptCallback(&cb2, &eventBase);
1794 serverSocket->addAcceptCallback(&cb3, &eventBase);
1795 serverSocket->addAcceptCallback(&cb4, &eventBase);
1796 serverSocket->addAcceptCallback(&cb5, &eventBase);
1797 serverSocket->addAcceptCallback(&cb6, &eventBase);
1798 serverSocket->addAcceptCallback(&cb7, &eventBase);
1799 serverSocket->startAccepting();
1802 std::shared_ptr<AsyncSocket> sock1(
1804 std::shared_ptr<AsyncSocket> sock4(
1859 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1862 std::shared_ptr<AsyncServerSocket> serverSocket(
1864 serverSocket->bind(0);
1865 serverSocket->listen(16);
1871 auto thread_id = std::this_thread::get_id();
1873 CHECK_NE(thread_id, std::this_thread::get_id());
1874 thread_id = std::this_thread::get_id();
1878 ASSERT_EQ(thread_id, std::this_thread::get_id());
1879 serverSocket->removeAcceptCallback(&cb1, &eventBase);
1882 [&]() {
ASSERT_EQ(thread_id, std::this_thread::get_id()); });
1885 serverSocket->addAcceptCallback(&cb1, &eventBase);
1886 serverSocket->startAccepting();
1889 std::shared_ptr<AsyncSocket> sock1(
1893 auto other = std::thread([&]() { eventBase.
loop(); });
1952 TEST(AsyncSocketTest, DestroyCloseTest) {
1960 socket->connect(&ccb, server.
getAddress(), 30);
1963 std::shared_ptr<AsyncSocket> acceptedSocket = server.
acceptAsync(&serverEB);
1965 acceptedSocket->setReadCB(&rcb);
1968 size_t simpleBufLength = 5000000;
1969 char* simpleBuf =
new char[simpleBufLength];
1970 memset(simpleBuf,
'a', simpleBufLength);
1974 int fd = acceptedSocket->getFd();
1976 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1978 acceptedSocket.reset();
1982 ssize_t sz =
read(fd, simpleBuf, simpleBufLength);
1992 TEST(AsyncSocketTest, ServerExistingSocket) {
1998 int fd =
fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2004 serverSocket->useExistingSocket(fd);
2008 serverSocket->bind(address);
2009 serverSocket->listen(16);
2019 int fd =
fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2022 struct sockaddr_in addr;
2023 addr.sin_family = AF_INET;
2025 addr.sin_addr.s_addr = INADDR_ANY;
2027 bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
2035 serverSocket->useExistingSocket(fd);
2036 serverSocket->listen(16);
2040 serverSocket->
getAddress(&serverSocketAddress);
2041 ASSERT_EQ(boundAddress, serverSocketAddress);
2051 int fd =
fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2054 struct sockaddr_in addr;
2055 addr.sin_family = AF_INET;
2057 addr.sin_addr.s_addr = INADDR_ANY;
2059 bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
2069 serverSocket->useExistingSocket(fd);
2073 serverSocket->
getAddress(&serverSocketAddress);
2074 ASSERT_EQ(boundAddress, serverSocketAddress);
2081 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2085 std::shared_ptr<AsyncServerSocket> serverSocket(
2091 serverSocket->bind(serverAddress);
2092 serverSocket->listen(16);
2098 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2101 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2103 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2104 serverSocket->startAccepting();
2107 std::shared_ptr<AsyncSocket>
socket(
2120 int fd = acceptCallback.
getEvents()->at(1).fd;
2123 int flags = fcntl(fd, F_GETFL, 0);
2124 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2127 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2132 std::shared_ptr<AsyncServerSocket> serverSocket(
2134 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2135 serverSocket->bind(0);
2136 serverSocket->listen(16);
2144 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2147 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2149 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2150 serverSocket->startAccepting();
2153 std::shared_ptr<AsyncSocket>
socket(
2170 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2175 std::shared_ptr<AsyncServerSocket> serverSocket(
2177 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2178 serverSocket->bind(0);
2179 serverSocket->listen(16);
2187 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2190 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2192 bool acceptStartedFlag{
false};
2194 [&acceptStartedFlag]() { acceptStartedFlag =
true; });
2195 bool acceptStoppedFlag{
false};
2197 [&acceptStoppedFlag]() { acceptStoppedFlag =
true; });
2198 serverSocket->addAcceptCallback(&acceptCallback,
nullptr);
2199 serverSocket->startAccepting();
2202 std::shared_ptr<AsyncSocket>
socket(
2221 TEST(AsyncSocketTest, CallbackInSecondaryEventBase) {
2226 std::shared_ptr<AsyncServerSocket> serverSocket(
2228 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2229 serverSocket->bind(0);
2230 serverSocket->listen(16);
2240 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2245 [&] { serverSocket->removeAcceptCallback(&acceptCallback,
nullptr); });
2247 std::atomic<bool> acceptStartedFlag{
false};
2251 serverSocket->addAcceptCallback(&acceptCallback, cobThread.
getEventBase());
2252 serverSocket->startAccepting();
2255 std::shared_ptr<AsyncSocket>
socket(
2277 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2285 serverSocket->bind(0);
2286 serverSocket->listen(16);
2297 ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2301 serverSocket->removeAcceptCallback(&acceptCallback,
nullptr);
2307 [&] { serverSocket->removeAcceptCallback(&acceptCallback,
nullptr); });
2309 serverSocket->addAcceptCallback(&acceptCallback, cobThread.
getEventBase());
2310 serverSocket->startAccepting();
2325 TEST(AsyncSocketTest, BufferTest) {
2334 char buf[100 * 1024];
2335 memset(buf,
'c',
sizeof(buf));
2338 socket->setBufferCallback(&bcb);
2355 TEST(AsyncSocketTest, BufferCallbackKill) {
2364 char buf[100 * 1024];
2365 memset(buf,
'c',
sizeof(buf));
2367 socket->setBufferCallback(&bcb);
2387 TEST(AsyncSocketTest, ConnectTFO) {
2394 socket->enableTFO();
2396 socket->connect(&cb, server.
getAddress(), 30);
2398 std::array<uint8_t, 128> buf;
2399 memset(buf.data(),
'a', buf.size());
2401 std::array<uint8_t, 3>
readBuf;
2405 auto acceptedSocket = server.
accept();
2406 acceptedSocket->write(buf.data(), buf.size());
2407 acceptedSocket->flush();
2408 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2409 acceptedSocket->close();
2415 EXPECT_LE(0, socket->getConnectTime().count());
2416 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2422 socket->writeChain(&write, sendBuf->clone());
2423 socket->setReadCB(&rcb);
2429 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2434 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2437 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2444 socket->enableTFO();
2446 socket->connect(&cb, server.
getAddress(), 30);
2448 socket->setReadCB(&rcb);
2450 std::array<uint8_t, 128> buf;
2451 memset(buf.data(),
'a', buf.size());
2453 std::array<uint8_t, 3>
readBuf;
2457 auto acceptedSocket = server.
accept();
2458 acceptedSocket->write(buf.data(), buf.size());
2459 acceptedSocket->flush();
2460 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2461 acceptedSocket->close();
2467 EXPECT_LE(0, socket->getConnectTime().count());
2468 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2473 socket->writeChain(&write, sendBuf->clone());
2479 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2484 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2490 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2495 socket->enableTFO();
2500 socket->connect(&cb, addr, 30);
2511 if (!socket->getTFOFinished()) {
2521 EXPECT_LE(0, socket->getConnectTime().count());
2522 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2529 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2535 socket->enableTFO();
2538 socket->connect(&ccb, server.
getAddress(), 30);
2541 std::array<char, 128> buf;
2542 memset(buf.data(),
'a', buf.size());
2559 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2565 socket->enableTFO();
2568 socket->connect(&ccb, server.
getAddress(), 30);
2584 using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2588 MOCK_METHOD3(tfoSendMsg, ssize_t(
int fd,
struct msghdr* msg,
int msg_flags));
2591 TEST(AsyncSocketTest, TestTFOUnsupported) {
2596 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2610 socket->writeChain(&write, sendBuf->clone());
2613 std::array<uint8_t, 128> buf;
2614 memset(buf.data(),
'a', buf.size());
2616 std::array<uint8_t, 3>
readBuf;
2619 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
2620 acceptedSocket->write(buf.data(), buf.size());
2621 acceptedSocket->flush();
2622 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2623 acceptedSocket->close();
2632 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2640 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2643 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2649 .WillOnce(
Invoke([&](
int fd,
struct msghdr*,
int) {
2650 sockaddr_storage
addr;
2652 int ret =
connect(fd, (
const struct sockaddr*)&addr, len);
2653 LOG(
INFO) <<
"connecting the socket " << fd <<
" : " << ret <<
" : " 2660 socket->connect(&cb, fakeAddr, 30);
2666 if (
socket->getTFOFinished()) {
2681 EXPECT_EQ(std::chrono::milliseconds(30),
socket->getConnectTimeout());
2685 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2701 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2706 socket->connect(&ccb, addr, 1);
2722 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2727 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2738 .WillOnce(
Invoke([&](
int fd,
struct msghdr*,
int) {
2739 sockaddr_storage
addr;
2741 return connect(fd, (
const struct sockaddr*)&addr, len);
2745 socket->writeChain(&write, sendBuf->clone());
2748 std::array<uint8_t, 128> buf;
2749 memset(buf.data(),
'a', buf.size());
2751 std::array<uint8_t, 3>
readBuf;
2754 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
2755 acceptedSocket->write(buf.data(), buf.size());
2756 acceptedSocket->flush();
2757 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2758 acceptedSocket->close();
2764 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2775 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2791 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2796 socket->connect(&ccb, addr, 1);
2803 .WillOnce(
Invoke([&](
int fd,
struct msghdr*,
int) {
2804 sockaddr_storage addr2;
2806 return connect(fd, (
const struct sockaddr*)&addr2, len);
2816 TEST(AsyncSocketTest, TestTFOEagain) {
2821 auto socket = MockAsyncTFOSocket::UniquePtr(
new MockAsyncTFOSocket(&evb));
2840 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2847 socket->enableTFO();
2849 socket->connect(&cb, server.
getAddress(), 30);
2851 std::array<uint8_t, 128> buf;
2852 memset(buf.data(),
'a', buf.size());
2854 constexpr
size_t len = 10 * 1024;
2856 sendBuf->append(len);
2857 std::array<uint8_t, len>
readBuf;
2860 auto acceptedSocket = server.
accept();
2861 acceptedSocket->write(buf.data(), buf.size());
2862 acceptedSocket->flush();
2863 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2864 acceptedSocket->close();
2870 EXPECT_LE(0, socket->getConnectTime().count());
2871 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2877 socket->writeChain(&write, sendBuf->clone());
2878 socket->setReadCB(&rcb);
2884 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2889 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2892 #endif // FOLLY_ALLOW_TFO 2900 TEST(AsyncSocketTest, EvbCallbacks) {
2901 auto cb = std::make_unique<MockEvbChangeCallback>();
2906 EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
2907 EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
2909 socket->setEvbChangedCallback(
std::move(cb));
2910 socket->detachEventBase();
2911 socket->attachEventBase(&evb);
2914 TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
2922 socket->connect(&cb, server.
getAddress(), 30);
2927 EXPECT_LE(0, socket->getConnectTime().count());
2928 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2932 socket->setReadCB(&rcb);
2934 auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
2936 EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
2937 EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
2939 socket->setEvbChangedCallback(
std::move(cbEvbChg));
2941 socket->detachEventBase();
2942 socket->attachEventBase(&evb);
2947 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 2950 enum SOF_TIMESTAMPING {
2951 SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
2952 SOF_TIMESTAMPING_OPT_ID = (1 << 7),
2953 SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
2954 SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
2955 SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
2960 TestErrMessageCallback()
2963 void errMessage(
const cmsghdr& cmsg)
noexcept override {
2964 if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
2966 checkResetCallback();
2968 (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
2969 (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
2971 checkResetCallback();
2975 void errMessageError(
2980 void checkResetCallback()
noexcept {
2981 if (
socket_ !=
nullptr && resetAfter_ != -1 &&
2982 gotTimestamp_ + gotByteSeq_ == resetAfter_) {
2983 socket_->setErrMessageCB(
nullptr);
2989 int gotTimestamp_{0};
2991 int resetAfter_{-1};
2994 TEST(AsyncSocketTest, ErrMessageCallback) {
3002 socket->connect(&ccb, server.
getAddress(), 30);
3003 LOG(
INFO) <<
"Client socket fd=" << socket->getFd();
3014 socket->setReadCB(&rcb);
3017 TestErrMessageCallback errMsgCB;
3018 socket->setErrMessageCB(&errMsgCB);
3020 socket->getErrMessageCallback(),
3023 errMsgCB.socket_ = socket.get();
3024 errMsgCB.resetAfter_ = 3;
3028 int flags = SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY |
3029 SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_CMSG |
3030 SOF_TIMESTAMPING_TX_SCHED;
3035 std::vector<uint8_t> wbuf(128,
'a');
3038 socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
3039 socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
3042 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
3043 LOG(
INFO) <<
"Server socket fd=" << acceptedSocket->getSocketFD();
3050 std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
3051 uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
3052 ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
3056 acceptedSocket->close();
3068 errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
3070 #endif // FOLLY_HAVE_MSG_ERRQUEUE 3077 socket->connect(
nullptr, server.
getAddress(), 30);
3086 peekCallback.dataAvailableCallback = [&]() {
3087 peekCallback.verifyData(
"he", 2);
3090 acceptedSocket->setReadCB(
nullptr);
3091 acceptedSocket->setReadCB(&readCallback);
3093 readCallback.dataAvailableCallback = [&]() {
3094 if (readCallback.dataRead() == 5) {
3095 readCallback.verifyData(
"hello", 5);
3096 acceptedSocket->setReadCB(
nullptr);
3100 acceptedSocket->setReadCB(&peekCallback);
3110 socket->connect(
nullptr, server.
getAddress(), 30);
3119 peekCallback.dataAvailableCallback = [&]() {
3120 peekCallback.verifyData(
"hello", 5);
3122 acceptedSocket->setReadCB(&readCallback);
3124 readCallback.dataAvailableCallback = [&]() {
3125 readCallback.verifyData(
"hello", 5);
3126 acceptedSocket->setReadCB(
nullptr);
3129 acceptedSocket->setReadCB(&peekCallback);
3139 socket->connect(
nullptr, server.
getAddress(), 30);
3149 peekCallback.dataAvailableCallback = [&]() {
3150 peekCallback.verifyData(
"hello", 5);
3152 acceptedSocket->setReadCB(&smallReadCallback);
3154 smallReadCallback.dataAvailableCallback = [&]() {
3155 smallReadCallback.verifyData(
"hel", 3);
3156 acceptedSocket->setReadCB(&normalReadCallback);
3158 normalReadCallback.dataAvailableCallback = [&]() {
3159 normalReadCallback.verifyData(
"lo", 2);
3160 acceptedSocket->setReadCB(
nullptr);
3163 acceptedSocket->setReadCB(&peekCallback);
3173 socket->connect(
nullptr, server.
getAddress(), 30);
3178 auto acceptedSocket =
3184 peekCallback.dataAvailableCallback = [&]() {
3185 peekCallback.verifyData(
"hel", 3);
3187 acceptedSocket->setReadCB(
nullptr);
3190 takeoverSocket->setReadCB(&readCallback);
3192 readCallback.dataAvailableCallback = [&]() {
3193 readCallback.verifyData(
"hello", 5);
3194 takeoverSocket->setReadCB(
nullptr);
3197 acceptedSocket->setReadCB(&peekCallback);
3203 TEST(AsyncSocketTest, SendMessageFlags) {
3206 MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE, 0,
nullptr);
3213 socket->connect(&ccb, server.
getAddress(), 30);
3214 std::shared_ptr<BlockingSocket> acceptedSocket = server.
accept();
3220 socket->setSendMsgParamCB(&sendMsgCB);
3221 ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
3225 std::vector<uint8_t> buf(128,
'a');
3227 sendMsgCB.
reset(MSG_DONTWAIT | MSG_NOSIGNAL);
3228 socket->write(&wcb, buf.data(), buf.size());
3236 sendMsgCB.
reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
3237 socket->write(&wcb, buf.data(), buf.size());
3244 std::vector<uint8_t> readbuf(2 * buf.size());
3245 uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
3246 ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
3250 acceptedSocket->close();
3257 TEST(AsyncSocketTest, SendMessageAncillaryData) {
3281 int tmpfd = file.
fd();
3282 ASSERT_NE(tmpfd, -1) <<
"Failed to open a temporary file";
3285 write(tmpfd, magicString.c_str(), magicString.length()),
3286 magicString.length());
3291 char control[CMSG_SPACE(
sizeof(
int))];
3294 s_u.cmh.cmsg_len = CMSG_LEN(
sizeof(
int));
3295 s_u.cmh.cmsg_level = SOL_SOCKET;
3296 s_u.cmh.cmsg_type = SCM_RIGHTS;
3297 memcpy(CMSG_DATA(&s_u.cmh), &tmpfd,
sizeof(
int));
3301 MSG_DONTWAIT | MSG_NOSIGNAL,
sizeof(s_u.control), s_u.control);
3302 socket->setSendMsgParamCB(&sendMsgCB);
3308 socket->write(&wcb, &s_data,
sizeof(s_data));
3314 char control[CMSG_SPACE(
sizeof(
int))];
3321 msgh.msg_control = r_u.control;
3322 msgh.msg_controllen =
sizeof(r_u.control);
3323 msgh.msg_name =
nullptr;
3324 msgh.msg_namelen = 0;
3325 msgh.msg_iov = &iov;
3326 msgh.msg_iovlen = 1;
3327 iov.iov_base = &r_data;
3328 iov.iov_len =
sizeof(r_data);
3334 ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(
sizeof(
int)));
3335 ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
3336 ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
3339 memcpy(&fd, CMSG_DATA(&r_u.cmh),
sizeof(
int));
3345 std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
3352 magicString.length(),
3353 read(fd, transferredMagicString.data(), transferredMagicString.size()));
3355 magicString.begin(), magicString.end(), transferredMagicString.begin()));
3358 TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
3379 EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
3380 EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
3386 TestErrMessageCallback errMsgCB;
3388 socket->setErrMessageCB(&errMsgCB);
3389 ASSERT_EQ(socket->getErrMessageCallback(),
nullptr);
3391 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 3396 char control[CMSG_SPACE(
sizeof(
int))];
3403 msgh.msg_control = r_u.control;
3404 msgh.msg_controllen =
sizeof(r_u.control);
3405 msgh.msg_name =
nullptr;
3406 msgh.msg_namelen = 0;
3407 msgh.msg_iov = &iov;
3408 msgh.msg_iovlen = 1;
3409 iov.iov_base = &recv_data;
3410 iov.iov_len =
sizeof(recv_data);
3414 EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
3418 int test_data = 123456;
3420 socket->write(&wcb, &test_data,
sizeof(test_data));
3424 #endif // FOLLY_HAVE_MSG_ERRQUEUE 3427 TEST(AsyncSocketTest, V6TosReflectTest) {
3431 std::shared_ptr<AsyncServerSocket> serverSocket(
3433 folly::IPAddress ip(
"::1");
3434 std::vector<folly::IPAddress> serverIp;
3435 serverIp.push_back(ip);
3436 serverSocket->bind(serverIp, 0);
3437 serverSocket->listen(16);
3442 serverSocket->setTosReflect(
true);
3448 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3451 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3453 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
3454 serverSocket->startAccepting();
3457 auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
3464 optionMap.insert({v6Opts, 0x2c});
3466 clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
3469 std::shared_ptr<AsyncSocket>
socket(
nullptr);
3471 clientThread(socket, &cb, &eventBase, serverAddress);
3477 int fd = acceptCallback.
getEvents()->at(1).fd;
3480 socklen_t valueLength =
sizeof(
value);
3481 int rc =
getsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &value, &valueLength);
3486 TEST(AsyncSocketTest, V4TosReflectTest) {
3490 std::shared_ptr<AsyncServerSocket> serverSocket(
3492 folly::IPAddress ip(
"127.0.0.1");
3493 std::vector<folly::IPAddress> serverIp;
3494 serverIp.push_back(ip);
3495 serverSocket->bind(serverIp, 0);
3496 serverSocket->listen(16);
3501 serverSocket->setTosReflect(
true);
3507 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3510 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3512 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
3513 serverSocket->startAccepting();
3516 auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
3523 optionMap.insert({v4Opts, 0x2c});
3525 clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
3528 std::shared_ptr<AsyncSocket>
socket(
nullptr);
3530 clientThread(socket, &cb, &eventBase, serverAddress);
3536 int fd = acceptCallback.
getEvents()->at(1).fd;
3539 socklen_t valueLength =
sizeof(
value);
3540 int rc =
getsockopt(fd, IPPROTO_IP, IP_TOS, &value, &valueLength);
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define EXPECT_LE(val1, val2)
EventBase * getEventBase() const
#define ASSERT_GE(val1, val2)
#define ASSERT_GT(val1, val2)
void setFromPath(StringPiece path)
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
void write(const T &in, folly::io::Appender &appender)
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
void timeoutExpired() noexceptoverride
static std::unique_ptr< IOBuf > create(std::size_t capacity)
folly::AsyncSocketException exception
unsigned int getBackoffEnded() const
EventBase * getEventBase() const override
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
virtual bool isClosedByPeer() const
#define EXPECT_EQ(val1, val2)
std::shared_ptr< BlockingSocket > accept(int timeout=50)
folly::AsyncSocketException exception
std::deque< EventInfo > * getEvents()
void appendChain(std::unique_ptr< IOBuf > &&iobuf)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
EventBase * getEventBase()
unsigned int getBackoffError() const
static bool isIPv6Enabled()
std::vector< TFOState > getTestingValues()
#define ASSERT_LE(val1, val2)
INSTANTIATE_TEST_CASE_P(ConnectTests, AsyncSocketConnectTest,::testing::ValuesIn(getTestingValues()))
std::vector< Buffer > buffers
Gen seq(Value first, Value last)
unsigned int getConnectionAcceptedError() const
—— Concurrent Priority Queue Implementation ——
std::unique_ptr< IOBuf > clone() const
auto msvcSuppressAbortOnInvalidParams(Func func) -> decltype(func())
requires E e noexcept(noexcept(s.error(std::move(e))))
#define EXPECT_GE(val1, val2)
void getAddress(SocketAddress *addressReturn) const override
unsigned int getConnectionDropped() const
DelayedWrite(const std::shared_ptr< AsyncSocket > &socket, unique_ptr< IOBuf > &&bufs, AsyncTransportWrapper::WriteCallback *wcb, bool cork, bool lastWrite=false)
std::shared_ptr< AsyncSocket > socket_
static bool isIPv4Enabled()
void testConnectOptWrite(size_t size1, size_t size2, bool close=false)
void setPort(uint16_t port)
AsyncTransportWrapper::WriteCallback * wcb_
unique_ptr< IOBuf > bufs_
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
#define MOCK_METHOD3(m,...)
VoidCallback successCallback
FOLLY_ALWAYS_INLINE bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout, const WaitOptions &opt=wait_options()) noexcept
virtual void handleRead() noexcept
void setMaxReadsPerEvent(uint16_t maxReads)
std::map< OptionKey, int > OptionMap
VoidCallback successCallback
void runInLoop(LoopCallback *callback, bool thisIteration=false)
static constexpr const char * kGooglePublicDnsAAddrIPv6
bool hasBufferCleared() const
bool loopOnce(int flags=0)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
size_t read(T &out, folly::io::Cursor &cursor)
auto end(TestAdlIterable &instance)
void setAcceptStartedFn(const std::function< void()> &fn)
AsyncServerSocket::UniquePtr socket_
TEST_P(AsyncSocketConnectTest, ConnectAndWrite)
bool runInEventBaseThread(void(*fn)(T *), T *arg)
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
NetworkSocket socket(int af, int type, int protocol)
socklen_t getAddress(sockaddr_storage *addr) const
int listen(NetworkSocket s, int backlog)
virtual void startAccepting()
unsigned int getBackoffStarted() const
int acceptFD(int timeout=50)
void tmpDisableReads(AsyncSocket *socket, ReadCallback *rcb)
TEST(ProgramOptionsTest, Errors)
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
#define EXPECT_TRUE(condition)
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
void setReadCB(ReadCallback *callback) override
#define MOCK_METHOD1(m,...)
const folly::SocketAddress & getAddress() const
void setFromLocalAddress(int socket)
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
static constexpr const char * kGooglePublicDnsAAddrIPv4
EventBase * getEventBase() const override
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool scheduleTimeout(uint32_t milliseconds)
unsigned int getConnectionEnqueuedForAcceptCallback() const
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define ASSERT_NE(val1, val2)
void serverSocketSanityTest(AsyncServerSocket *serverSocket)
#define ASSERT_FALSE(condition)
#define EXPECT_FALSE(condition)
virtual bool isClosedBySelf() const
PolymorphicAction< internal::SetErrnoAndReturnAction< T > > SetErrnoAndReturn(int errval, T result)
Range< const char * > StringPiece
AsyncSocketExceptionType getType() const noexcept
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
#define ASSERT_TRUE(condition)
int close(NetworkSocket s)
ThreadPoolListHook * addr
void verifyConnection(const char *buf, size_t len)
int apply(int fd, int val) const
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
#define T_CHECK_TIMEOUT(start, end, expectedMS,...)
option(BUILD_SHARED_LIBS"Build shared libraries (DLLs)."OFF) option(gmock_build_tests"Build all of Google Mock's own tests."OFF) if(EXISTS"$
unsigned int getConnectionAccepted() const
void setAcceptStoppedFn(const std::function< void()> &fn)
unsigned int getConnectionDequeuedByAcceptCallback() const
void append(std::size_t amount)
int socketpair(int domain, int type, int protocol, NetworkSocket sv[2])
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)
std::atomic< size_t > bytesWritten