proxygen
AsyncSocketTest2.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2010-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/Random.h>
21 #include <folly/SocketAddress.h>
26 
28 #include <folly/io/IOBuf.h>
37 
38 #include <fcntl.h>
39 #include <sys/types.h>
40 #include <iostream>
41 #include <memory>
42 #include <thread>
43 
44 using std::cerr;
45 using std::endl;
46 using std::min;
47 using std::string;
48 using std::unique_ptr;
49 using std::vector;
50 using std::chrono::milliseconds;
51 
52 using namespace folly;
53 using namespace folly::test;
54 using namespace testing;
55 
57 
58 class DelayedWrite : public AsyncTimeout {
59  public:
61  const std::shared_ptr<AsyncSocket>& socket,
62  unique_ptr<IOBuf>&& bufs,
64  bool cork,
65  bool lastWrite = false)
66  : AsyncTimeout(socket->getEventBase()),
67  socket_(socket),
68  bufs_(std::move(bufs)),
69  wcb_(wcb),
70  cork_(cork),
71  lastWrite_(lastWrite) {}
72 
73  private:
74  void timeoutExpired() noexcept override {
76  socket_->writeChain(wcb_, std::move(bufs_), flags);
77  if (lastWrite_) {
78  socket_->shutdownWrite();
79  }
80  }
81 
82  std::shared_ptr<AsyncSocket> socket_;
83  unique_ptr<IOBuf> bufs_;
85  bool cork_;
86  bool lastWrite_;
87 };
88 
90 // connect() tests
92 
96 TEST(AsyncSocketTest, Connect) {
97  // Start listening on a local port
98  TestServer server;
99 
100  // Connect using a AsyncSocket
101  EventBase evb;
102  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
103  ConnCallback cb;
104  const auto startedAt = std::chrono::steady_clock::now();
105  socket->connect(&cb, server.getAddress(), 30);
106 
107  evb.loop();
108  const auto finishedAt = std::chrono::steady_clock::now();
109 
111  EXPECT_LE(0, socket->getConnectTime().count());
112  EXPECT_GE(socket->getConnectStartTime(), startedAt);
113  EXPECT_LE(socket->getConnectStartTime(), socket->getConnectEndTime());
114  EXPECT_LE(socket->getConnectEndTime(), finishedAt);
115  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
116 }
117 
118 enum class TFOState {
119  DISABLED,
120  ENABLED,
121 };
122 
123 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
124 
125 std::vector<TFOState> getTestingValues() {
126  std::vector<TFOState> vals;
127  vals.emplace_back(TFOState::DISABLED);
128 
129 #if FOLLY_ALLOW_TFO
130  vals.emplace_back(TFOState::ENABLED);
131 #endif
132  return vals;
133 }
134 
136  ConnectTests,
138  ::testing::ValuesIn(getTestingValues()));
139 
143 TEST(AsyncSocketTest, ConnectRefused) {
144  EventBase evb;
145 
146  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
147 
148  // Hopefully nothing is actually listening on this address
149  folly::SocketAddress addr("127.0.0.1", 65535);
150  ConnCallback cb;
151  socket->connect(&cb, addr, 30);
152 
153  evb.loop();
154 
157  EXPECT_LE(0, socket->getConnectTime().count());
158  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
159 }
160 
164 TEST(AsyncSocketTest, ConnectTimeout) {
165  EventBase evb;
166 
167  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
168 
169  // Try connecting to server that won't respond.
170  //
171  // This depends somewhat on the network where this test is run.
172  // Hopefully this IP will be routable but unresponsive.
173  // (Alternatively, we could try listening on a local raw socket, but that
174  // normally requires root privileges.)
179  : nullptr;
180  SocketAddress addr(host, 65535);
181  ConnCallback cb;
182  socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
183 
184  evb.loop();
185 
188  // This can happen if we could not route to the IP address picked above.
189  // In this case the connect will fail immediately rather than timing out.
190  // Just skip the test in this case.
191  SKIP() << "do not have a routable but unreachable IP address";
192  }
194 
195  // Verify that we can still get the peer address after a timeout.
196  // Use case is if the client was created from a client pool, and we want
197  // to log which peer failed.
199  socket->getPeerAddress(&peer);
200  ASSERT_EQ(peer, addr);
201  EXPECT_LE(0, socket->getConnectTime().count());
202  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
203 }
204 
209 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
210  TestServer server;
211 
212  // connect()
213  EventBase evb;
214  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
215 
216  if (GetParam() == TFOState::ENABLED) {
217  socket->enableTFO();
218  }
219 
220  ConnCallback ccb;
221  socket->connect(&ccb, server.getAddress(), 30);
222 
223  // write()
224  char buf[128];
225  memset(buf, 'a', sizeof(buf));
226  WriteCallback wcb;
227  socket->write(&wcb, buf, sizeof(buf));
228 
229  // Loop. We don't bother accepting on the server socket yet.
230  // The kernel should be able to buffer the write request so it can succeed.
231  evb.loop();
232 
235 
236  // Make sure the server got a connection and received the data
237  socket->close();
238  server.verifyConnection(buf, sizeof(buf));
239 
240  ASSERT_TRUE(socket->isClosedBySelf());
241  ASSERT_FALSE(socket->isClosedByPeer());
242  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
243 }
244 
248 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
249  TestServer server;
250 
251  // connect()
252  EventBase evb;
253  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
254  if (GetParam() == TFOState::ENABLED) {
255  socket->enableTFO();
256  }
257 
258  socket->connect(nullptr, server.getAddress(), 30);
259 
260  // write some data, just so we have some way of verifing
261  // that the socket works correctly after connecting
262  char buf[128];
263  memset(buf, 'a', sizeof(buf));
264  WriteCallback wcb;
265  socket->write(&wcb, buf, sizeof(buf));
266 
267  evb.loop();
268 
270 
271  // Make sure the server got a connection and received the data
272  socket->close();
273  server.verifyConnection(buf, sizeof(buf));
274 
275  ASSERT_TRUE(socket->isClosedBySelf());
276  ASSERT_FALSE(socket->isClosedByPeer());
277 }
278 
285 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
286  TestServer server;
287 
288  // connect()
289  EventBase evb;
290  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
291  if (GetParam() == TFOState::ENABLED) {
292  socket->enableTFO();
293  }
294  ConnCallback ccb;
295  socket->connect(&ccb, server.getAddress(), 30);
296 
297  // write()
298  char buf[128];
299  memset(buf, 'a', sizeof(buf));
300  WriteCallback wcb;
301  socket->write(&wcb, buf, sizeof(buf));
302 
303  // close()
304  socket->close();
305 
306  // Loop. We don't bother accepting on the server socket yet.
307  // The kernel should be able to buffer the write request so it can succeed.
308  evb.loop();
309 
312 
313  // Make sure the server got a connection and received the data
314  server.verifyConnection(buf, sizeof(buf));
315 
316  ASSERT_TRUE(socket->isClosedBySelf());
317  ASSERT_FALSE(socket->isClosedByPeer());
318 }
319 
323 TEST(AsyncSocketTest, ConnectAndClose) {
324  TestServer server;
325 
326  // Connect using a AsyncSocket
327  EventBase evb;
328  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
329  ConnCallback ccb;
330  socket->connect(&ccb, server.getAddress(), 30);
331 
332  // Hopefully the connect didn't succeed immediately.
333  // If it did, we can't exercise the close-while-connecting code path.
334  if (ccb.state == STATE_SUCCEEDED) {
335  LOG(INFO) << "connect() succeeded immediately; aborting test "
336  "of close-during-connect behavior";
337  return;
338  }
339 
340  socket->close();
341 
342  // Loop, although there shouldn't be anything to do.
343  evb.loop();
344 
345  // Make sure the connection was aborted
347 
348  ASSERT_TRUE(socket->isClosedBySelf());
349  ASSERT_FALSE(socket->isClosedByPeer());
350 }
351 
357 TEST(AsyncSocketTest, ConnectAndCloseNow) {
358  TestServer server;
359 
360  // Connect using a AsyncSocket
361  EventBase evb;
362  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
363  ConnCallback ccb;
364  socket->connect(&ccb, server.getAddress(), 30);
365 
366  // Hopefully the connect didn't succeed immediately.
367  // If it did, we can't exercise the close-while-connecting code path.
368  if (ccb.state == STATE_SUCCEEDED) {
369  LOG(INFO) << "connect() succeeded immediately; aborting test "
370  "of closeNow()-during-connect behavior";
371  return;
372  }
373 
374  socket->closeNow();
375 
376  // Loop, although there shouldn't be anything to do.
377  evb.loop();
378 
379  // Make sure the connection was aborted
381 
382  ASSERT_TRUE(socket->isClosedBySelf());
383  ASSERT_FALSE(socket->isClosedByPeer());
384 }
385 
392 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
393  TestServer server;
394 
395  // connect()
396  EventBase evb;
397  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
398  ConnCallback ccb;
399  socket->connect(&ccb, server.getAddress(), 30);
400 
401  // Hopefully the connect didn't succeed immediately.
402  // If it did, we can't exercise the close-while-connecting code path.
403  if (ccb.state == STATE_SUCCEEDED) {
404  LOG(INFO) << "connect() succeeded immediately; aborting test "
405  "of write-during-connect behavior";
406  return;
407  }
408 
409  // write()
410  char buf[128];
411  memset(buf, 'a', sizeof(buf));
412  WriteCallback wcb;
413  socket->write(&wcb, buf, sizeof(buf));
414 
415  // close()
416  socket->closeNow();
417 
418  // Loop, although there shouldn't be anything to do.
419  evb.loop();
420 
423 
424  ASSERT_TRUE(socket->isClosedBySelf());
425  ASSERT_FALSE(socket->isClosedByPeer());
426 }
427 
431 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
432  TestServer server;
433 
434  // connect()
435  EventBase evb;
436  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
437  if (GetParam() == TFOState::ENABLED) {
438  socket->enableTFO();
439  }
440 
441  ConnCallback ccb;
442  socket->connect(&ccb, server.getAddress(), 30);
443 
444  ReadCallback rcb;
445  socket->setReadCB(&rcb);
446 
447  if (GetParam() == TFOState::ENABLED) {
448  // Trigger a connection
449  socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
450  }
451 
452  // Even though we haven't looped yet, we should be able to accept
453  // the connection and send data to it.
454  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
455  uint8_t buf[128];
456  memset(buf, 'a', sizeof(buf));
457  acceptedSocket->write(buf, sizeof(buf));
458  acceptedSocket->flush();
459  acceptedSocket->close();
460 
461  // Loop, although there shouldn't be anything to do.
462  evb.loop();
463 
465  ASSERT_EQ(rcb.buffers.size(), 1);
466  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
467  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
468 
469  ASSERT_FALSE(socket->isClosedBySelf());
470  ASSERT_FALSE(socket->isClosedByPeer());
471 }
472 
477 TEST(AsyncSocketTest, ConnectReadAndClose) {
478  TestServer server;
479 
480  // connect()
481  EventBase evb;
482  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
483  ConnCallback ccb;
484  socket->connect(&ccb, server.getAddress(), 30);
485 
486  // Hopefully the connect didn't succeed immediately.
487  // If it did, we can't exercise the close-while-connecting code path.
488  if (ccb.state == STATE_SUCCEEDED) {
489  LOG(INFO) << "connect() succeeded immediately; aborting test "
490  "of read-during-connect behavior";
491  return;
492  }
493 
494  ReadCallback rcb;
495  socket->setReadCB(&rcb);
496 
497  // close()
498  socket->close();
499 
500  // Loop, although there shouldn't be anything to do.
501  evb.loop();
502 
503  ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
504  ASSERT_EQ(rcb.buffers.size(), 0);
505  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
506 
507  ASSERT_TRUE(socket->isClosedBySelf());
508  ASSERT_FALSE(socket->isClosedByPeer());
509 }
510 
515 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
516  TestServer server;
517 
518  // connect()
519  EventBase evb;
520  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
521  if (GetParam() == TFOState::ENABLED) {
522  socket->enableTFO();
523  }
524  ConnCallback ccb;
525  socket->connect(&ccb, server.getAddress(), 30);
526 
527  // write()
528  char buf1[128];
529  memset(buf1, 'a', sizeof(buf1));
530  WriteCallback wcb;
531  socket->write(&wcb, buf1, sizeof(buf1));
532 
533  // set a read callback
534  ReadCallback rcb;
535  socket->setReadCB(&rcb);
536 
537  // Even though we haven't looped yet, we should be able to accept
538  // the connection and send data to it.
539  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
540  uint8_t buf2[128];
541  memset(buf2, 'b', sizeof(buf2));
542  acceptedSocket->write(buf2, sizeof(buf2));
543  acceptedSocket->flush();
544 
545  // shut down the write half of acceptedSocket, so that the AsyncSocket
546  // will stop reading and we can break out of the event loop.
547  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
548 
549  // Loop
550  evb.loop();
551 
552  // Make sure the connect succeeded
554 
555  // Make sure the AsyncSocket read the data written by the accepted socket
557  ASSERT_EQ(rcb.buffers.size(), 1);
558  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
559  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
560 
561  // Close the AsyncSocket so we'll see EOF on acceptedSocket
562  socket->close();
563 
564  // Make sure the accepted socket saw the data written by the AsyncSocket
565  uint8_t readbuf[sizeof(buf1)];
566  acceptedSocket->readAll(readbuf, sizeof(readbuf));
567  ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
568  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
569  ASSERT_EQ(bytesRead, 0);
570 
571  ASSERT_FALSE(socket->isClosedBySelf());
572  ASSERT_TRUE(socket->isClosedByPeer());
573 }
574 
579 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
580  TestServer server;
581 
582  // connect()
583  EventBase evb;
584  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
585  ConnCallback ccb;
586  socket->connect(&ccb, server.getAddress(), 30);
587 
588  // Hopefully the connect didn't succeed immediately.
589  // If it did, we can't exercise the write-while-connecting code path.
590  if (ccb.state == STATE_SUCCEEDED) {
591  LOG(INFO) << "connect() succeeded immediately; skipping test";
592  return;
593  }
594 
595  // Ask to write some data
596  char wbuf[128];
597  memset(wbuf, 'a', sizeof(wbuf));
598  WriteCallback wcb;
599  socket->write(&wcb, wbuf, sizeof(wbuf));
600  socket->shutdownWrite();
601 
602  // Shutdown writes
603  socket->shutdownWrite();
604 
605  // Even though we haven't looped yet, we should be able to accept
606  // the connection.
607  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
608 
609  // Since the connection is still in progress, there should be no data to
610  // read yet. Verify that the accepted socket is not readable.
611  struct pollfd fds[1];
612  fds[0].fd = acceptedSocket->getSocketFD();
613  fds[0].events = POLLIN;
614  fds[0].revents = 0;
615  int rc = poll(fds, 1, 0);
616  ASSERT_EQ(rc, 0);
617 
618  // Write data to the accepted socket
619  uint8_t acceptedWbuf[192];
620  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
621  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
622  acceptedSocket->flush();
623 
624  // Loop
625  evb.loop();
626 
627  // The loop should have completed the connection, written the queued data,
628  // and shutdown writes on the socket.
629  //
630  // Check that the connection was completed successfully and that the write
631  // callback succeeded.
634 
635  // Check that we can read the data that was written to the socket, and that
636  // we see an EOF, since its socket was half-shutdown.
637  uint8_t readbuf[sizeof(wbuf)];
638  acceptedSocket->readAll(readbuf, sizeof(readbuf));
639  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
640  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
641  ASSERT_EQ(bytesRead, 0);
642 
643  // Close the accepted socket. This will cause it to see EOF
644  // and uninstall the read callback when we loop next.
645  acceptedSocket->close();
646 
647  // Install a read callback, then loop again.
648  ReadCallback rcb;
649  socket->setReadCB(&rcb);
650  evb.loop();
651 
652  // This loop should have read the data and seen the EOF
654  ASSERT_EQ(rcb.buffers.size(), 1);
655  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
656  ASSERT_EQ(
657  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
658 
659  ASSERT_FALSE(socket->isClosedBySelf());
660  ASSERT_FALSE(socket->isClosedByPeer());
661 }
662 
667 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
668  TestServer server;
669 
670  // connect()
671  EventBase evb;
672  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
673  ConnCallback ccb;
674  socket->connect(&ccb, server.getAddress(), 30);
675 
676  // Hopefully the connect didn't succeed immediately.
677  // If it did, we can't exercise the write-while-connecting code path.
678  if (ccb.state == STATE_SUCCEEDED) {
679  LOG(INFO) << "connect() succeeded immediately; skipping test";
680  return;
681  }
682 
683  // Install a read callback
684  ReadCallback rcb;
685  socket->setReadCB(&rcb);
686 
687  // Ask to write some data
688  char wbuf[128];
689  memset(wbuf, 'a', sizeof(wbuf));
690  WriteCallback wcb;
691  socket->write(&wcb, wbuf, sizeof(wbuf));
692 
693  // Shutdown writes
694  socket->shutdownWrite();
695 
696  // Even though we haven't looped yet, we should be able to accept
697  // the connection.
698  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
699 
700  // Since the connection is still in progress, there should be no data to
701  // read yet. Verify that the accepted socket is not readable.
702  struct pollfd fds[1];
703  fds[0].fd = acceptedSocket->getSocketFD();
704  fds[0].events = POLLIN;
705  fds[0].revents = 0;
706  int rc = poll(fds, 1, 0);
707  ASSERT_EQ(rc, 0);
708 
709  // Write data to the accepted socket
710  uint8_t acceptedWbuf[192];
711  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
712  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
713  acceptedSocket->flush();
714  // Shutdown writes to the accepted socket. This will cause it to see EOF
715  // and uninstall the read callback.
716  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
717 
718  // Loop
719  evb.loop();
720 
721  // The loop should have completed the connection, written the queued data,
722  // shutdown writes on the socket, read the data we wrote to it, and see the
723  // EOF.
724  //
725  // Check that the connection was completed successfully and that the read
726  // and write callbacks were invoked as expected.
729  ASSERT_EQ(rcb.buffers.size(), 1);
730  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
731  ASSERT_EQ(
732  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
734 
735  // Check that we can read the data that was written to the socket, and that
736  // we see an EOF, since its socket was half-shutdown.
737  uint8_t readbuf[sizeof(wbuf)];
738  acceptedSocket->readAll(readbuf, sizeof(readbuf));
739  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
740  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
741  ASSERT_EQ(bytesRead, 0);
742 
743  // Fully close both sockets
744  acceptedSocket->close();
745  socket->close();
746 
747  ASSERT_FALSE(socket->isClosedBySelf());
748  ASSERT_TRUE(socket->isClosedByPeer());
749 }
750 
755 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
756  TestServer server;
757 
758  // connect()
759  EventBase evb;
760  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
761  ConnCallback ccb;
762  socket->connect(&ccb, server.getAddress(), 30);
763 
764  // Hopefully the connect didn't succeed immediately.
765  // If it did, we can't exercise the write-while-connecting code path.
766  if (ccb.state == STATE_SUCCEEDED) {
767  LOG(INFO) << "connect() succeeded immediately; skipping test";
768  return;
769  }
770 
771  // Install a read callback
772  ReadCallback rcb;
773  socket->setReadCB(&rcb);
774 
775  // Ask to write some data
776  char wbuf[128];
777  memset(wbuf, 'a', sizeof(wbuf));
778  WriteCallback wcb;
779  socket->write(&wcb, wbuf, sizeof(wbuf));
780 
781  // Shutdown writes immediately.
782  // This should immediately discard the data that we just tried to write.
783  socket->shutdownWriteNow();
784 
785  // Verify that writeError() was invoked on the write callback.
787  ASSERT_EQ(wcb.bytesWritten, 0);
788 
789  // Even though we haven't looped yet, we should be able to accept
790  // the connection.
791  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
792 
793  // Since the connection is still in progress, there should be no data to
794  // read yet. Verify that the accepted socket is not readable.
795  struct pollfd fds[1];
796  fds[0].fd = acceptedSocket->getSocketFD();
797  fds[0].events = POLLIN;
798  fds[0].revents = 0;
799  int rc = poll(fds, 1, 0);
800  ASSERT_EQ(rc, 0);
801 
802  // Write data to the accepted socket
803  uint8_t acceptedWbuf[192];
804  memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
805  acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
806  acceptedSocket->flush();
807  // Shutdown writes to the accepted socket. This will cause it to see EOF
808  // and uninstall the read callback.
809  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
810 
811  // Loop
812  evb.loop();
813 
814  // The loop should have completed the connection, written the queued data,
815  // shutdown writes on the socket, read the data we wrote to it, and see the
816  // EOF.
817  //
818  // Check that the connection was completed successfully and that the read
819  // callback was invoked as expected.
822  ASSERT_EQ(rcb.buffers.size(), 1);
823  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
824  ASSERT_EQ(
825  memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
826 
827  // Since we used shutdownWriteNow(), it should have discarded all pending
828  // write data. Verify we see an immediate EOF when reading from the accepted
829  // socket.
830  uint8_t readbuf[sizeof(wbuf)];
831  uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
832  ASSERT_EQ(bytesRead, 0);
833 
834  // Fully close both sockets
835  acceptedSocket->close();
836  socket->close();
837 
838  ASSERT_FALSE(socket->isClosedBySelf());
839  ASSERT_TRUE(socket->isClosedByPeer());
840 }
841 
842 // Helper function for use in testConnectOptWrite()
843 // Temporarily disable the read callback
845  // Uninstall the read callback
846  socket->setReadCB(nullptr);
847  // Schedule the read callback to be reinstalled after 1ms
848  socket->getEventBase()->runInLoop(
849  std::bind(&AsyncSocket::setReadCB, socket, rcb));
850 }
851 
858 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
859  TestServer server;
860  EventBase evb;
861  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
862 
863  // connect()
864  ConnCallback ccb;
865  socket->connect(&ccb, server.getAddress(), 30);
866 
867  // Hopefully the connect didn't succeed immediately.
868  // If it did, we can't exercise the optimistic write code path.
869  if (ccb.state == STATE_SUCCEEDED) {
870  LOG(INFO) << "connect() succeeded immediately; aborting test "
871  "of optimistic write behavior";
872  return;
873  }
874 
875  // Tell the connect callback to perform a write when the connect succeeds
876  WriteCallback wcb2;
877  std::unique_ptr<char[]> buf2(new char[size2]);
878  memset(buf2.get(), 'b', size2);
879  if (size2 > 0) {
880  ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
881  // Tell the second write callback to close the connection when it is done
882  wcb2.successCallback = [&] { socket->closeNow(); };
883  }
884 
885  // Schedule one write() immediately, before the connect finishes
886  std::unique_ptr<char[]> buf1(new char[size1]);
887  memset(buf1.get(), 'a', size1);
888  WriteCallback wcb1;
889  if (size1 > 0) {
890  socket->write(&wcb1, buf1.get(), size1);
891  }
892 
893  if (close) {
894  // immediately perform a close, before connect() completes
895  socket->close();
896  }
897 
898  // Start reading from the other endpoint after 10ms.
899  // If we're using large buffers, we have to read so that the writes don't
900  // block forever.
901  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
902  ReadCallback rcb;
903  rcb.dataAvailableCallback =
904  std::bind(tmpDisableReads, acceptedSocket.get(), &rcb);
905  socket->getEventBase()->tryRunAfterDelay(
906  std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb), 10);
907 
908  // Loop. We don't bother accepting on the server socket yet.
909  // The kernel should be able to buffer the write request so it can succeed.
910  evb.loop();
911 
913  if (size1 > 0) {
914  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
915  }
916  if (size2 > 0) {
918  }
919 
920  socket->close();
921 
922  // Make sure the read callback received all of the data
923  size_t bytesRead = 0;
924  for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
925  it != rcb.buffers.end();
926  ++it) {
927  size_t start = bytesRead;
928  bytesRead += it->length;
929  size_t end = bytesRead;
930  if (start < size1) {
931  size_t cmpLen = min(size1, end) - start;
932  ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
933  }
934  if (end > size1 && end <= size1 + size2) {
935  size_t itOffset;
936  size_t buf2Offset;
937  size_t cmpLen;
938  if (start >= size1) {
939  itOffset = 0;
940  buf2Offset = start - size1;
941  cmpLen = end - start;
942  } else {
943  itOffset = size1 - start;
944  buf2Offset = 0;
945  cmpLen = end - size1;
946  }
947  ASSERT_EQ(
948  memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, cmpLen), 0);
949  }
950  }
951  ASSERT_EQ(bytesRead, size1 + size2);
952 }
953 
954 TEST(AsyncSocketTest, ConnectCallbackWrite) {
955  // Test using small writes that should both succeed immediately
956  testConnectOptWrite(100, 200);
957 
958  // Test using a large buffer in the connect callback, that should block
959  const size_t largeSize = 32 * 1024 * 1024;
960  testConnectOptWrite(100, largeSize);
961 
962  // Test using a large initial write
963  testConnectOptWrite(largeSize, 100);
964 
965  // Test using two large buffers
966  testConnectOptWrite(largeSize, largeSize);
967 
968  // Test a small write in the connect callback,
969  // but no immediate write before connect completes
970  testConnectOptWrite(0, 64);
971 
972  // Test a large write in the connect callback,
973  // but no immediate write before connect completes
974  testConnectOptWrite(0, largeSize);
975 
976  // Test connect, a small write, then immediately call close() before connect
977  // completes
978  testConnectOptWrite(211, 0, true);
979 
980  // Test connect, a large immediate write (that will block), then immediately
981  // call close() before connect completes
982  testConnectOptWrite(largeSize, 0, true);
983 }
984 
986 // write() related tests
988 
992 TEST(AsyncSocketTest, WriteNullCallback) {
993  TestServer server;
994 
995  // connect()
996  EventBase evb;
997  std::shared_ptr<AsyncSocket> socket =
998  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
999  evb.loop(); // loop until the socket is connected
1000 
1001  // write() with a nullptr callback
1002  char buf[128];
1003  memset(buf, 'a', sizeof(buf));
1004  socket->write(nullptr, buf, sizeof(buf));
1005 
1006  evb.loop(); // loop until the data is sent
1007 
1008  // Make sure the server got a connection and received the data
1009  socket->close();
1010  server.verifyConnection(buf, sizeof(buf));
1011 
1012  ASSERT_TRUE(socket->isClosedBySelf());
1013  ASSERT_FALSE(socket->isClosedByPeer());
1014 }
1015 
1019 TEST(AsyncSocketTest, WriteTimeout) {
1020  TestServer server;
1021 
1022  // connect()
1023  EventBase evb;
1024  std::shared_ptr<AsyncSocket> socket =
1025  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1026  evb.loop(); // loop until the socket is connected
1027 
1028  // write() a large chunk of data, with no-one on the other end reading.
1029  // Tricky: the kernel caches the connection metrics for recently-used
1030  // routes (see tcp_no_metrics_save) so a freshly opened connection can
1031  // have a send buffer size bigger than wmem_default. This makes the test
1032  // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1033  size_t writeLength = 32 * 1024 * 1024;
1034  uint32_t timeout = 200;
1035  socket->setSendTimeout(timeout);
1036  std::unique_ptr<char[]> buf(new char[writeLength]);
1037  memset(buf.get(), 'a', writeLength);
1038  WriteCallback wcb;
1039  socket->write(&wcb, buf.get(), writeLength);
1040 
1041  TimePoint start;
1042  evb.loop();
1043  TimePoint end;
1044 
1045  // Make sure the write attempt timed out as requested
1046  ASSERT_EQ(wcb.state, STATE_FAILED);
1047  ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1048 
1049  // Check that the write timed out within a reasonable period of time.
1050  // We don't check for exactly the specified timeout, since AsyncSocket only
1051  // times out when it hasn't made progress for that period of time.
1052  //
1053  // On linux, the first write sends a few hundred kb of data, then blocks for
1054  // writability, and then unblocks again after 40ms and is able to write
1055  // another smaller of data before blocking permanently. Therefore it doesn't
1056  // time out until 40ms + timeout.
1057  //
1058  // I haven't fully verified the cause of this, but I believe it probably
1059  // occurs because the receiving end delays sending an ack for up to 40ms.
1060  // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1061  // the ack, it can send some more data. However, after that point the
1062  // receiver's kernel buffer is full. This 40ms delay happens even with
1063  // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1064  // kernel may be automatically disabling TCP_QUICKACK after receiving some
1065  // data.
1066  //
1067  // For now, we simply check that the timeout occurred within 160ms of
1068  // the requested value.
1069  T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1070 }
1071 
1075 TEST(AsyncSocketTest, WritePipeError) {
1076  TestServer server;
1077 
1078  // connect()
1079  EventBase evb;
1080  std::shared_ptr<AsyncSocket> socket =
1081  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1082  socket->setSendTimeout(1000);
1083  evb.loop(); // loop until the socket is connected
1084 
1085  // accept and immediately close the socket
1086  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1087  acceptedSocket->close();
1088 
1089  // write() a large chunk of data
1090  size_t writeLength = 32 * 1024 * 1024;
1091  std::unique_ptr<char[]> buf(new char[writeLength]);
1092  memset(buf.get(), 'a', writeLength);
1093  WriteCallback wcb;
1094  socket->write(&wcb, buf.get(), writeLength);
1095 
1096  evb.loop();
1097 
1098  // Make sure the write failed.
1099  // It would be nice if AsyncSocketException could convey the errno value,
1100  // so that we could check for EPIPE
1101  ASSERT_EQ(wcb.state, STATE_FAILED);
1102  ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::INTERNAL_ERROR);
1103 
1104  ASSERT_FALSE(socket->isClosedBySelf());
1105  ASSERT_FALSE(socket->isClosedByPeer());
1106 }
1107 
1111 TEST(AsyncSocketTest, WriteAfterReadEOF) {
1112  TestServer server;
1113 
1114  // connect()
1115  EventBase evb;
1116  std::shared_ptr<AsyncSocket> socket =
1117  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1118  evb.loop(); // loop until the socket is connected
1119 
1120  // Accept the connection
1121  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1122  ReadCallback rcb;
1123  acceptedSocket->setReadCB(&rcb);
1124 
1125  // Shutdown the write side of client socket (read side of server socket)
1126  socket->shutdownWrite();
1127  evb.loop();
1128 
1129  // Check that accepted socket is still writable
1130  ASSERT_FALSE(acceptedSocket->good());
1131  ASSERT_TRUE(acceptedSocket->writable());
1132 
1133  // Write data to accepted socket
1134  constexpr size_t simpleBufLength = 5;
1135  char simpleBuf[simpleBufLength];
1136  memset(simpleBuf, 'a', simpleBufLength);
1137  WriteCallback wcb;
1138  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1139  evb.loop();
1140 
1141  // Make sure we were able to write even after getting a read EOF
1142  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
1144 }
1145 
1149 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1150  // Send and receive buffer sizes for the sockets.
1151  // Note that Linux will double this value to allow space for bookkeeping
1152  // overhead.
1153  constexpr size_t kSockBufSize = 8 * 1024;
1154  constexpr size_t kEffectiveSockBufSize = 2 * kSockBufSize;
1155 
1156  TestServer server(false, kSockBufSize);
1157 
1158  AsyncSocket::OptionMap options{
1159  {{SOL_SOCKET, SO_SNDBUF}, int(kSockBufSize)},
1160  {{SOL_SOCKET, SO_RCVBUF}, int(kSockBufSize)},
1161  {{IPPROTO_TCP, TCP_NODELAY}, 1},
1162  };
1163 
1164  // The current thread will be used by the receiver - use a separate thread
1165  // for the sender.
1166  EventBase senderEvb;
1167  std::thread senderThread([&]() { senderEvb.loopForever(); });
1168 
1169  ConnCallback ccb;
1170  std::shared_ptr<AsyncSocket> socket;
1171 
1172  senderEvb.runInEventBaseThreadAndWait([&]() {
1173  socket = AsyncSocket::newSocket(&senderEvb);
1174  socket->connect(&ccb, server.getAddress(), 30, options);
1175  });
1176 
1177  // accept the socket on the server side
1178  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1179 
1180  // Send a big (100KB) write so that it is partially written.
1181  constexpr size_t kSendSize = 100 * 1024;
1182  auto const sendBuf = std::vector<char>(kSendSize, 'a');
1183 
1184  WriteCallback wcb;
1185 
1186  senderEvb.runInEventBaseThreadAndWait(
1187  [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
1188 
1189  // Read 20KB of data from the socket to allow the sender to send a bit more
1190  // data after it initially blocks.
1191  constexpr size_t kRecvSize = 20 * 1024;
1192  uint8_t recvBuf[kRecvSize];
1193  auto bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1194  ASSERT_EQ(kRecvSize, bytesRead);
1195  EXPECT_EQ(0, memcmp(recvBuf, sendBuf.data(), bytesRead));
1196 
1197  // We should be able to send at least the amount of data received plus the
1198  // send buffer size. In practice we should probably be able to send
1199  constexpr size_t kMinExpectedBytesWritten = kRecvSize + kSockBufSize;
1200 
1201  // We shouldn't be able to send more than the amount of data received plus
1202  // the send buffer size of the sending socket (kEffectiveSockBufSize) plus
1203  // the receive buffer size on the receiving socket (kEffectiveSockBufSize)
1204  constexpr size_t kMaxExpectedBytesWritten =
1205  kRecvSize + kEffectiveSockBufSize + kEffectiveSockBufSize;
1206  static_assert(
1207  kMaxExpectedBytesWritten < kSendSize, "kSendSize set too small");
1208 
1209  // Need to delay after receiving 20KB and before closing the receive side so
1210  // that the send side has a chance to fill the send buffer past.
1211  using clock = std::chrono::steady_clock;
1212  auto const deadline = clock::now() + std::chrono::seconds(2);
1213  while (wcb.bytesWritten < kMinExpectedBytesWritten &&
1214  clock::now() < deadline) {
1216  }
1217  acceptedSocket->closeWithReset();
1218 
1219  senderEvb.terminateLoopSoon();
1220  senderThread.join();
1221 
1222  ASSERT_EQ(STATE_FAILED, wcb.state);
1223  ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
1224  ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
1225 }
1226 
1230 TEST(AsyncSocketTest, WriteIOBuf) {
1231  TestServer server;
1232 
1233  // connect()
1234  EventBase evb;
1235  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1236  ConnCallback ccb;
1237  socket->connect(&ccb, server.getAddress(), 30);
1238 
1239  // Accept the connection
1240  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1241  ReadCallback rcb;
1242  acceptedSocket->setReadCB(&rcb);
1243 
1244  // Check if EOR tracking flag can be set and reset.
1245  EXPECT_FALSE(socket->isEorTrackingEnabled());
1246  socket->setEorTracking(true);
1247  EXPECT_TRUE(socket->isEorTrackingEnabled());
1248  socket->setEorTracking(false);
1249  EXPECT_FALSE(socket->isEorTrackingEnabled());
1250 
1251  // Write a simple buffer to the socket
1252  constexpr size_t simpleBufLength = 5;
1253  char simpleBuf[simpleBufLength];
1254  memset(simpleBuf, 'a', simpleBufLength);
1255  WriteCallback wcb;
1256  socket->write(&wcb, simpleBuf, simpleBufLength);
1257 
1258  // Write a single-element IOBuf chain
1259  size_t buf1Length = 7;
1260  unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1261  memset(buf1->writableData(), 'b', buf1Length);
1262  buf1->append(buf1Length);
1263  unique_ptr<IOBuf> buf1Copy(buf1->clone());
1264  WriteCallback wcb2;
1265  socket->writeChain(&wcb2, std::move(buf1));
1266 
1267  // Write a multiple-element IOBuf chain
1268  size_t buf2Length = 11;
1269  unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1270  memset(buf2->writableData(), 'c', buf2Length);
1271  buf2->append(buf2Length);
1272  size_t buf3Length = 13;
1273  unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1274  memset(buf3->writableData(), 'd', buf3Length);
1275  buf3->append(buf3Length);
1276  buf2->appendChain(std::move(buf3));
1277  unique_ptr<IOBuf> buf2Copy(buf2->clone());
1278  buf2Copy->coalesce();
1279  WriteCallback wcb3;
1280  socket->writeChain(&wcb3, std::move(buf2));
1281  socket->shutdownWrite();
1282 
1283  // Let the reads and writes run to completion
1284  evb.loop();
1285 
1287  ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1289 
1290  // Make sure the reader got the right data in the right order
1292  ASSERT_EQ(rcb.buffers.size(), 1);
1293  ASSERT_EQ(
1294  rcb.buffers[0].length,
1295  simpleBufLength + buf1Length + buf2Length + buf3Length);
1296  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1297  ASSERT_EQ(
1298  memcmp(
1299  rcb.buffers[0].buffer + simpleBufLength,
1300  buf1Copy->data(),
1301  buf1Copy->length()),
1302  0);
1303  ASSERT_EQ(
1304  memcmp(
1305  rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1306  buf2Copy->data(),
1307  buf2Copy->length()),
1308  0);
1309 
1310  acceptedSocket->close();
1311  socket->close();
1312 
1313  ASSERT_TRUE(socket->isClosedBySelf());
1314  ASSERT_FALSE(socket->isClosedByPeer());
1315 }
1316 
1317 TEST(AsyncSocketTest, WriteIOBufCorked) {
1318  TestServer server;
1319 
1320  // connect()
1321  EventBase evb;
1322  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1323  ConnCallback ccb;
1324  socket->connect(&ccb, server.getAddress(), 30);
1325 
1326  // Accept the connection
1327  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1328  ReadCallback rcb;
1329  acceptedSocket->setReadCB(&rcb);
1330 
1331  // Do three writes, 100ms apart, with the "cork" flag set
1332  // on the second write. The reader should see the first write
1333  // arrive by itself, followed by the second and third writes
1334  // arriving together.
1335  size_t buf1Length = 5;
1336  unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1337  memset(buf1->writableData(), 'a', buf1Length);
1338  buf1->append(buf1Length);
1339  size_t buf2Length = 7;
1340  unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1341  memset(buf2->writableData(), 'b', buf2Length);
1342  buf2->append(buf2Length);
1343  size_t buf3Length = 11;
1344  unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1345  memset(buf3->writableData(), 'c', buf3Length);
1346  buf3->append(buf3Length);
1347  WriteCallback wcb1;
1348  socket->writeChain(&wcb1, std::move(buf1));
1349  WriteCallback wcb2;
1350  DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1351  write2.scheduleTimeout(100);
1352  WriteCallback wcb3;
1353  DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1354  write3.scheduleTimeout(140);
1355 
1356  evb.loop();
1360  if (wcb3.state != STATE_SUCCEEDED) {
1361  throw(wcb3.exception);
1362  }
1364 
1365  // Make sure the reader got the data with the right grouping
1367  ASSERT_EQ(rcb.buffers.size(), 2);
1368  ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1369  ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1370 
1371  acceptedSocket->close();
1372  socket->close();
1373 
1374  ASSERT_TRUE(socket->isClosedBySelf());
1375  ASSERT_FALSE(socket->isClosedByPeer());
1376 }
1377 
1381 TEST(AsyncSocketTest, ZeroLengthWrite) {
1382  TestServer server;
1383 
1384  // connect()
1385  EventBase evb;
1386  std::shared_ptr<AsyncSocket> socket =
1387  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1388  evb.loop(); // loop until the socket is connected
1389 
1390  auto acceptedSocket = server.acceptAsync(&evb);
1391  ReadCallback rcb;
1392  acceptedSocket->setReadCB(&rcb);
1393 
1394  size_t len1 = 1024 * 1024;
1395  size_t len2 = 1024 * 1024;
1396  std::unique_ptr<char[]> buf(new char[len1 + len2]);
1397  memset(buf.get(), 'a', len1);
1398  memset(buf.get(), 'b', len2);
1399 
1400  WriteCallback wcb1;
1401  WriteCallback wcb2;
1402  WriteCallback wcb3;
1403  WriteCallback wcb4;
1404  socket->write(&wcb1, buf.get(), 0);
1405  socket->write(&wcb2, buf.get(), len1);
1406  socket->write(&wcb3, buf.get() + len1, 0);
1407  socket->write(&wcb4, buf.get() + len1, len2);
1408  socket->close();
1409 
1410  evb.loop(); // loop until the data is sent
1411 
1412  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1416  rcb.verifyData(buf.get(), len1 + len2);
1417 
1418  ASSERT_TRUE(socket->isClosedBySelf());
1419  ASSERT_FALSE(socket->isClosedByPeer());
1420 }
1421 
1422 TEST(AsyncSocketTest, ZeroLengthWritev) {
1423  TestServer server;
1424 
1425  // connect()
1426  EventBase evb;
1427  std::shared_ptr<AsyncSocket> socket =
1428  AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1429  evb.loop(); // loop until the socket is connected
1430 
1431  auto acceptedSocket = server.acceptAsync(&evb);
1432  ReadCallback rcb;
1433  acceptedSocket->setReadCB(&rcb);
1434 
1435  size_t len1 = 1024 * 1024;
1436  size_t len2 = 1024 * 1024;
1437  std::unique_ptr<char[]> buf(new char[len1 + len2]);
1438  memset(buf.get(), 'a', len1);
1439  memset(buf.get(), 'b', len2);
1440 
1441  WriteCallback wcb;
1442  constexpr size_t iovCount = 4;
1443  struct iovec iov[iovCount];
1444  iov[0].iov_base = buf.get();
1445  iov[0].iov_len = len1;
1446  iov[1].iov_base = buf.get() + len1;
1447  iov[1].iov_len = 0;
1448  iov[2].iov_base = buf.get() + len1;
1449  iov[2].iov_len = len2;
1450  iov[3].iov_base = buf.get() + len1 + len2;
1451  iov[3].iov_len = 0;
1452 
1453  socket->writev(&wcb, iov, iovCount);
1454  socket->close();
1455  evb.loop(); // loop until the data is sent
1456 
1457  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1458  rcb.verifyData(buf.get(), len1 + len2);
1459 
1460  ASSERT_TRUE(socket->isClosedBySelf());
1461  ASSERT_FALSE(socket->isClosedByPeer());
1462 }
1463 
1465 // close() related tests
1467 
1471 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1472  TestServer server;
1473 
1474  // connect()
1475  EventBase evb;
1476  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1477  ConnCallback ccb;
1478  socket->connect(&ccb, server.getAddress(), 30);
1479 
1480  // accept the socket on the server side
1481  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1482 
1483  // Loop to ensure the connect has completed
1484  evb.loop();
1485 
1486  // Make sure we are connected
1488 
1489  // Schedule pending writes, until several write attempts have blocked
1490  char buf[128];
1491  memset(buf, 'a', sizeof(buf));
1492  typedef vector<std::shared_ptr<WriteCallback>> WriteCallbackVector;
1493  WriteCallbackVector writeCallbacks;
1494 
1495  writeCallbacks.reserve(5);
1496  while (writeCallbacks.size() < 5) {
1497  std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1498 
1499  socket->write(wcb.get(), buf, sizeof(buf));
1500  if (wcb->state == STATE_SUCCEEDED) {
1501  // Succeeded immediately. Keep performing more writes
1502  continue;
1503  }
1504 
1505  // This write is blocked.
1506  // Have the write callback call close() when writeError() is invoked
1507  wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1508  writeCallbacks.push_back(wcb);
1509  }
1510 
1511  // Call closeNow() to immediately fail the pending writes
1512  socket->closeNow();
1513 
1514  // Make sure writeError() was invoked on all of the pending write callbacks
1515  for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1516  it != writeCallbacks.end();
1517  ++it) {
1518  ASSERT_EQ((*it)->state, STATE_FAILED);
1519  }
1520 
1521  ASSERT_TRUE(socket->isClosedBySelf());
1522  ASSERT_FALSE(socket->isClosedByPeer());
1523 }
1524 
1526 // ImmediateRead related tests
1528 
1529 /* AsyncSocket use to verify immediate read works */
1531  public:
1532  bool immediateReadCalled = false;
1534 
1535  protected:
1537  immediateReadCalled = true;
1539  }
1540 };
1541 
1542 TEST(AsyncSocket, ConnectReadImmediateRead) {
1543  TestServer server;
1544 
1545  const size_t maxBufferSz = 100;
1546  const size_t maxReadsPerEvent = 1;
1547  const size_t expectedDataSz = maxBufferSz * 3;
1548  char expectedData[expectedDataSz];
1549  memset(expectedData, 'j', expectedDataSz);
1550 
1551  EventBase evb;
1552  ReadCallback rcb(maxBufferSz);
1554  socket.connect(nullptr, server.getAddress(), 30);
1555 
1556  evb.loop(); // loop until the socket is connected
1557 
1558  socket.setReadCB(&rcb);
1559  socket.setMaxReadsPerEvent(maxReadsPerEvent);
1560  socket.immediateReadCalled = false;
1561 
1562  auto acceptedSocket = server.acceptAsync(&evb);
1563 
1564  ReadCallback rcbServer;
1565  WriteCallback wcbServer;
1566  rcbServer.dataAvailableCallback = [&]() {
1567  if (rcbServer.dataRead() == expectedDataSz) {
1568  // write back all data read
1569  rcbServer.verifyData(expectedData, expectedDataSz);
1570  acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1571  acceptedSocket->close();
1572  }
1573  };
1574  acceptedSocket->setReadCB(&rcbServer);
1575 
1576  // write data
1577  WriteCallback wcb1;
1578  socket.write(&wcb1, expectedData, expectedDataSz);
1579  evb.loop();
1581  rcb.verifyData(expectedData, expectedDataSz);
1582  ASSERT_EQ(socket.immediateReadCalled, true);
1583 
1584  ASSERT_FALSE(socket.isClosedBySelf());
1585  ASSERT_FALSE(socket.isClosedByPeer());
1586 }
1587 
1588 TEST(AsyncSocket, ConnectReadUninstallRead) {
1589  TestServer server;
1590 
1591  const size_t maxBufferSz = 100;
1592  const size_t maxReadsPerEvent = 1;
1593  const size_t expectedDataSz = maxBufferSz * 3;
1594  char expectedData[expectedDataSz];
1595  memset(expectedData, 'k', expectedDataSz);
1596 
1597  EventBase evb;
1598  ReadCallback rcb(maxBufferSz);
1600  socket.connect(nullptr, server.getAddress(), 30);
1601 
1602  evb.loop(); // loop until the socket is connected
1603 
1604  socket.setReadCB(&rcb);
1605  socket.setMaxReadsPerEvent(maxReadsPerEvent);
1606  socket.immediateReadCalled = false;
1607 
1608  auto acceptedSocket = server.acceptAsync(&evb);
1609 
1610  ReadCallback rcbServer;
1611  WriteCallback wcbServer;
1612  rcbServer.dataAvailableCallback = [&]() {
1613  if (rcbServer.dataRead() == expectedDataSz) {
1614  // write back all data read
1615  rcbServer.verifyData(expectedData, expectedDataSz);
1616  acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1617  acceptedSocket->close();
1618  }
1619  };
1620  acceptedSocket->setReadCB(&rcbServer);
1621 
1622  rcb.dataAvailableCallback = [&]() {
1623  // we read data and reset readCB
1624  socket.setReadCB(nullptr);
1625  };
1626 
1627  // write data
1628  WriteCallback wcb;
1629  socket.write(&wcb, expectedData, expectedDataSz);
1630  evb.loop();
1632 
1633  /* we shoud've only read maxBufferSz data since readCallback_
1634  * was reset in dataAvailableCallback */
1635  ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1636  ASSERT_EQ(socket.immediateReadCalled, false);
1637 
1638  ASSERT_FALSE(socket.isClosedBySelf());
1639  ASSERT_FALSE(socket.isClosedByPeer());
1640 }
1641 
1642 // TODO:
1643 // - Test connect() and have the connect callback set the read callback
1644 // - Test connect() and have the connect callback unset the read callback
1645 // - Test reading/writing/closing/destroying the socket in the connect callback
1646 // - Test reading/writing/closing/destroying the socket in the read callback
1647 // - Test reading/writing/closing/destroying the socket in the write callback
1648 // - Test one-way shutdown behavior
1649 // - Test changing the EventBase
1650 //
1651 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1652 // in connectSuccess(), readDataAvailable(), writeSuccess()
1653 
1655 // AsyncServerSocket tests
1657 
1661 TEST(AsyncSocketTest, ServerAcceptOptions) {
1662  EventBase eventBase;
1663 
1664  // Create a server socket
1665  std::shared_ptr<AsyncServerSocket> serverSocket(
1666  AsyncServerSocket::newSocket(&eventBase));
1667  serverSocket->bind(0);
1668  serverSocket->listen(16);
1669  folly::SocketAddress serverAddress;
1670  serverSocket->getAddress(&serverAddress);
1671 
1672  // Add a callback to accept one connection then stop the loop
1673  TestAcceptCallback acceptCallback;
1674  acceptCallback.setConnectionAcceptedFn(
1675  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1676  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1677  });
1678  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1679  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1680  });
1681  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1682  serverSocket->startAccepting();
1683 
1684  // Connect to the server socket
1685  std::shared_ptr<AsyncSocket> socket(
1686  AsyncSocket::newSocket(&eventBase, serverAddress));
1687 
1688  eventBase.loop();
1689 
1690  // Verify that the server accepted a connection
1691  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1692  ASSERT_EQ(
1693  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1694  ASSERT_EQ(
1695  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1696  ASSERT_EQ(
1697  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1698  int fd = acceptCallback.getEvents()->at(1).fd;
1699 
1700  // The accepted connection should already be in non-blocking mode
1701  int flags = fcntl(fd, F_GETFL, 0);
1702  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1703 
1704 #ifndef TCP_NOPUSH
1705  // The accepted connection should already have TCP_NODELAY set
1706  int value;
1707  socklen_t valueLength = sizeof(value);
1708  int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1709  ASSERT_EQ(rc, 0);
1710  ASSERT_EQ(value, 1);
1711 #endif
1712 }
1713 
1717 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1718  // Create a new AsyncServerSocket
1719  EventBase eventBase;
1720  std::shared_ptr<AsyncServerSocket> serverSocket(
1721  AsyncServerSocket::newSocket(&eventBase));
1722  serverSocket->bind(0);
1723  serverSocket->listen(16);
1724  folly::SocketAddress serverAddress;
1725  serverSocket->getAddress(&serverAddress);
1726 
1727  // Add several accept callbacks
1728  TestAcceptCallback cb1;
1729  TestAcceptCallback cb2;
1730  TestAcceptCallback cb3;
1731  TestAcceptCallback cb4;
1732  TestAcceptCallback cb5;
1733  TestAcceptCallback cb6;
1734  TestAcceptCallback cb7;
1735 
1736  // Test having callbacks remove other callbacks before them on the list,
1737  // after them on the list, or removing themselves.
1738  //
1739  // Have callback 2 remove callback 3 and callback 5 the first time it is
1740  // called.
1741  int cb2Count = 0;
1743  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1744  std::shared_ptr<AsyncSocket> sock2(AsyncSocket::newSocket(
1745  &eventBase, serverAddress)); // cb2: -cb3 -cb5
1746  });
1748  [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1750  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1751  std::shared_ptr<AsyncSocket> sock3(
1752  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1753  });
1755  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1756  std::shared_ptr<AsyncSocket> sock5(
1757  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1758  });
1760  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1761  if (cb2Count == 0) {
1762  serverSocket->removeAcceptCallback(&cb3, nullptr);
1763  serverSocket->removeAcceptCallback(&cb5, nullptr);
1764  }
1765  ++cb2Count;
1766  });
1767  // Have callback 6 remove callback 4 the first time it is called,
1768  // and destroy the server socket the second time it is called
1769  int cb6Count = 0;
1771  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1772  if (cb6Count == 0) {
1773  serverSocket->removeAcceptCallback(&cb4, nullptr);
1774  std::shared_ptr<AsyncSocket> sock6(
1775  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1776  std::shared_ptr<AsyncSocket> sock7(
1777  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1778  std::shared_ptr<AsyncSocket> sock8(
1779  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1780 
1781  } else {
1782  serverSocket.reset();
1783  }
1784  ++cb6Count;
1785  });
1786  // Have callback 7 remove itself
1788  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1789  serverSocket->removeAcceptCallback(&cb7, nullptr);
1790  });
1791 
1792  serverSocket->addAcceptCallback(&cb1, &eventBase);
1793  serverSocket->addAcceptCallback(&cb2, &eventBase);
1794  serverSocket->addAcceptCallback(&cb3, &eventBase);
1795  serverSocket->addAcceptCallback(&cb4, &eventBase);
1796  serverSocket->addAcceptCallback(&cb5, &eventBase);
1797  serverSocket->addAcceptCallback(&cb6, &eventBase);
1798  serverSocket->addAcceptCallback(&cb7, &eventBase);
1799  serverSocket->startAccepting();
1800 
1801  // Make several connections to the socket
1802  std::shared_ptr<AsyncSocket> sock1(
1803  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1804  std::shared_ptr<AsyncSocket> sock4(
1805  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1806 
1807  // Loop until we are stopped
1808  eventBase.loop();
1809 
1810  // Check to make sure that the expected callbacks were invoked.
1811  //
1812  // NOTE: This code depends on the AsyncServerSocket operating calling all of
1813  // the AcceptCallbacks in round-robin fashion, in the order that they were
1814  // added. The code is implemented this way right now, but the API doesn't
1815  // explicitly require it be done this way. If we change the code not to be
1816  // exactly round robin in the future, we can simplify the test checks here.
1817  // (We'll also need to update the termination code, since we expect cb6 to
1818  // get called twice to terminate the loop.)
1819  ASSERT_EQ(cb1.getEvents()->size(), 4);
1824 
1825  ASSERT_EQ(cb2.getEvents()->size(), 4);
1830 
1831  ASSERT_EQ(cb3.getEvents()->size(), 2);
1834 
1835  ASSERT_EQ(cb4.getEvents()->size(), 3);
1839 
1840  ASSERT_EQ(cb5.getEvents()->size(), 2);
1843 
1844  ASSERT_EQ(cb6.getEvents()->size(), 4);
1849 
1850  ASSERT_EQ(cb7.getEvents()->size(), 3);
1854 }
1855 
1859 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1860  // Create a new AsyncServerSocket
1861  EventBase eventBase;
1862  std::shared_ptr<AsyncServerSocket> serverSocket(
1863  AsyncServerSocket::newSocket(&eventBase));
1864  serverSocket->bind(0);
1865  serverSocket->listen(16);
1866  folly::SocketAddress serverAddress;
1867  serverSocket->getAddress(&serverAddress);
1868 
1869  // Add several accept callbacks
1870  TestAcceptCallback cb1;
1871  auto thread_id = std::this_thread::get_id();
1872  cb1.setAcceptStartedFn([&]() {
1873  CHECK_NE(thread_id, std::this_thread::get_id());
1874  thread_id = std::this_thread::get_id();
1875  });
1877  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1878  ASSERT_EQ(thread_id, std::this_thread::get_id());
1879  serverSocket->removeAcceptCallback(&cb1, &eventBase);
1880  });
1881  cb1.setAcceptStoppedFn(
1882  [&]() { ASSERT_EQ(thread_id, std::this_thread::get_id()); });
1883 
1884  // Test having callbacks remove other callbacks before them on the list,
1885  serverSocket->addAcceptCallback(&cb1, &eventBase);
1886  serverSocket->startAccepting();
1887 
1888  // Make several connections to the socket
1889  std::shared_ptr<AsyncSocket> sock1(
1890  AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1891 
1892  // Loop in another thread
1893  auto other = std::thread([&]() { eventBase.loop(); });
1894  other.join();
1895 
1896  // Check to make sure that the expected callbacks were invoked.
1897  //
1898  // NOTE: This code depends on the AsyncServerSocket operating calling all of
1899  // the AcceptCallbacks in round-robin fashion, in the order that they were
1900  // added. The code is implemented this way right now, but the API doesn't
1901  // explicitly require it be done this way. If we change the code not to be
1902  // exactly round robin in the future, we can simplify the test checks here.
1903  // (We'll also need to update the termination code, since we expect cb6 to
1904  // get called twice to terminate the loop.)
1905  ASSERT_EQ(cb1.getEvents()->size(), 3);
1909 }
1910 
1912  EventBase* eventBase = serverSocket->getEventBase();
1913  CHECK(eventBase);
1914 
1915  // Add a callback to accept one connection then stop accepting
1916  TestAcceptCallback acceptCallback;
1917  acceptCallback.setConnectionAcceptedFn(
1918  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1919  serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1920  });
1921  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1922  serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1923  });
1924  serverSocket->addAcceptCallback(&acceptCallback, eventBase);
1925  serverSocket->startAccepting();
1926 
1927  // Connect to the server socket
1928  folly::SocketAddress serverAddress;
1929  serverSocket->getAddress(&serverAddress);
1930  AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1931 
1932  // Loop to process all events
1933  eventBase->loop();
1934 
1935  // Verify that the server accepted a connection
1936  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1937  ASSERT_EQ(
1938  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
1939  ASSERT_EQ(
1940  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
1941  ASSERT_EQ(
1942  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
1943 }
1944 
1945 /* Verify that we don't leak sockets if we are destroyed()
1946  * and there are still writes pending
1947  *
1948  * If destroy() only calls close() instead of closeNow(),
1949  * it would shutdown(writes) on the socket, but it would
1950  * never be close()'d, and the socket would leak
1951  */
1952 TEST(AsyncSocketTest, DestroyCloseTest) {
1953  TestServer server;
1954 
1955  // connect()
1956  EventBase clientEB;
1957  EventBase serverEB;
1958  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1959  ConnCallback ccb;
1960  socket->connect(&ccb, server.getAddress(), 30);
1961 
1962  // Accept the connection
1963  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1964  ReadCallback rcb;
1965  acceptedSocket->setReadCB(&rcb);
1966 
1967  // Write a large buffer to the socket that is larger than kernel buffer
1968  size_t simpleBufLength = 5000000;
1969  char* simpleBuf = new char[simpleBufLength];
1970  memset(simpleBuf, 'a', simpleBufLength);
1971  WriteCallback wcb;
1972 
1973  // Let the reads and writes run to completion
1974  int fd = acceptedSocket->getFd();
1975 
1976  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1977  socket.reset();
1978  acceptedSocket.reset();
1979 
1980  // Test that server socket was closed
1982  ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1983  ASSERT_EQ(sz, -1);
1984  ASSERT_EQ(errno, EBADF);
1985  });
1986  delete[] simpleBuf;
1987 }
1988 
1992 TEST(AsyncSocketTest, ServerExistingSocket) {
1993  EventBase eventBase;
1994 
1995  // Test creating a socket, and letting AsyncServerSocket bind and listen
1996  {
1997  // Manually create a socket
1998  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1999  ASSERT_GE(fd, 0);
2000 
2001  // Create a server socket
2002  AsyncServerSocket::UniquePtr serverSocket(
2003  new AsyncServerSocket(&eventBase));
2004  serverSocket->useExistingSocket(fd);
2005  folly::SocketAddress address;
2006  serverSocket->getAddress(&address);
2007  address.setPort(0);
2008  serverSocket->bind(address);
2009  serverSocket->listen(16);
2010 
2011  // Make sure the socket works
2012  serverSocketSanityTest(serverSocket.get());
2013  }
2014 
2015  // Test creating a socket and binding manually,
2016  // then letting AsyncServerSocket listen
2017  {
2018  // Manually create a socket
2019  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2020  ASSERT_GE(fd, 0);
2021  // bind
2022  struct sockaddr_in addr;
2023  addr.sin_family = AF_INET;
2024  addr.sin_port = 0;
2025  addr.sin_addr.s_addr = INADDR_ANY;
2026  ASSERT_EQ(
2027  bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
2028  // Look up the address that we bound to
2029  folly::SocketAddress boundAddress;
2030  boundAddress.setFromLocalAddress(fd);
2031 
2032  // Create a server socket
2033  AsyncServerSocket::UniquePtr serverSocket(
2034  new AsyncServerSocket(&eventBase));
2035  serverSocket->useExistingSocket(fd);
2036  serverSocket->listen(16);
2037 
2038  // Make sure AsyncServerSocket reports the same address that we bound to
2039  folly::SocketAddress serverSocketAddress;
2040  serverSocket->getAddress(&serverSocketAddress);
2041  ASSERT_EQ(boundAddress, serverSocketAddress);
2042 
2043  // Make sure the socket works
2044  serverSocketSanityTest(serverSocket.get());
2045  }
2046 
2047  // Test creating a socket, binding and listening manually,
2048  // then giving it to AsyncServerSocket
2049  {
2050  // Manually create a socket
2051  int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2052  ASSERT_GE(fd, 0);
2053  // bind
2054  struct sockaddr_in addr;
2055  addr.sin_family = AF_INET;
2056  addr.sin_port = 0;
2057  addr.sin_addr.s_addr = INADDR_ANY;
2058  ASSERT_EQ(
2059  bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
2060  // Look up the address that we bound to
2061  folly::SocketAddress boundAddress;
2062  boundAddress.setFromLocalAddress(fd);
2063  // listen
2064  ASSERT_EQ(listen(fd, 16), 0);
2065 
2066  // Create a server socket
2067  AsyncServerSocket::UniquePtr serverSocket(
2068  new AsyncServerSocket(&eventBase));
2069  serverSocket->useExistingSocket(fd);
2070 
2071  // Make sure AsyncServerSocket reports the same address that we bound to
2072  folly::SocketAddress serverSocketAddress;
2073  serverSocket->getAddress(&serverSocketAddress);
2074  ASSERT_EQ(boundAddress, serverSocketAddress);
2075 
2076  // Make sure the socket works
2077  serverSocketSanityTest(serverSocket.get());
2078  }
2079 }
2080 
2081 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2082  EventBase eventBase;
2083 
2084  // Create a server socket
2085  std::shared_ptr<AsyncServerSocket> serverSocket(
2086  AsyncServerSocket::newSocket(&eventBase));
2087  string path(1, 0);
2088  path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2089  folly::SocketAddress serverAddress;
2090  serverAddress.setFromPath(path);
2091  serverSocket->bind(serverAddress);
2092  serverSocket->listen(16);
2093 
2094  // Add a callback to accept one connection then stop the loop
2095  TestAcceptCallback acceptCallback;
2096  acceptCallback.setConnectionAcceptedFn(
2097  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2098  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2099  });
2100  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2101  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2102  });
2103  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2104  serverSocket->startAccepting();
2105 
2106  // Connect to the server socket
2107  std::shared_ptr<AsyncSocket> socket(
2108  AsyncSocket::newSocket(&eventBase, serverAddress));
2109 
2110  eventBase.loop();
2111 
2112  // Verify that the server accepted a connection
2113  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2114  ASSERT_EQ(
2115  acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
2116  ASSERT_EQ(
2117  acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
2118  ASSERT_EQ(
2119  acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
2120  int fd = acceptCallback.getEvents()->at(1).fd;
2121 
2122  // The accepted connection should already be in non-blocking mode
2123  int flags = fcntl(fd, F_GETFL, 0);
2124  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2125 }
2126 
2127 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2128  EventBase eventBase;
2129  TestConnectionEventCallback connectionEventCallback;
2130 
2131  // Create a server socket
2132  std::shared_ptr<AsyncServerSocket> serverSocket(
2133  AsyncServerSocket::newSocket(&eventBase));
2134  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2135  serverSocket->bind(0);
2136  serverSocket->listen(16);
2137  folly::SocketAddress serverAddress;
2138  serverSocket->getAddress(&serverAddress);
2139 
2140  // Add a callback to accept one connection then stop the loop
2141  TestAcceptCallback acceptCallback;
2142  acceptCallback.setConnectionAcceptedFn(
2143  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2144  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2145  });
2146  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2147  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2148  });
2149  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2150  serverSocket->startAccepting();
2151 
2152  // Connect to the server socket
2153  std::shared_ptr<AsyncSocket> socket(
2154  AsyncSocket::newSocket(&eventBase, serverAddress));
2155 
2156  eventBase.loop();
2157 
2158  // Validate the connection event counters
2159  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2160  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2161  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2162  ASSERT_EQ(
2163  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2164  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2165  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2166  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2167  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2168 }
2169 
2170 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2171  EventBase eventBase;
2172  TestConnectionEventCallback connectionEventCallback;
2173 
2174  // Create a server socket
2175  std::shared_ptr<AsyncServerSocket> serverSocket(
2176  AsyncServerSocket::newSocket(&eventBase));
2177  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2178  serverSocket->bind(0);
2179  serverSocket->listen(16);
2180  folly::SocketAddress serverAddress;
2181  serverSocket->getAddress(&serverAddress);
2182 
2183  // Add a callback to accept one connection then stop the loop
2184  TestAcceptCallback acceptCallback;
2185  acceptCallback.setConnectionAcceptedFn(
2186  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2187  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2188  });
2189  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2190  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2191  });
2192  bool acceptStartedFlag{false};
2193  acceptCallback.setAcceptStartedFn(
2194  [&acceptStartedFlag]() { acceptStartedFlag = true; });
2195  bool acceptStoppedFlag{false};
2196  acceptCallback.setAcceptStoppedFn(
2197  [&acceptStoppedFlag]() { acceptStoppedFlag = true; });
2198  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2199  serverSocket->startAccepting();
2200 
2201  // Connect to the server socket
2202  std::shared_ptr<AsyncSocket> socket(
2203  AsyncSocket::newSocket(&eventBase, serverAddress));
2204 
2205  eventBase.loop();
2206 
2207  ASSERT_TRUE(acceptStartedFlag);
2208  ASSERT_TRUE(acceptStoppedFlag);
2209  // Validate the connection event counters
2210  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2211  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2212  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2213  ASSERT_EQ(
2214  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2215  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2216  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2217  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2218  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2219 }
2220 
2221 TEST(AsyncSocketTest, CallbackInSecondaryEventBase) {
2222  EventBase eventBase;
2223  TestConnectionEventCallback connectionEventCallback;
2224 
2225  // Create a server socket
2226  std::shared_ptr<AsyncServerSocket> serverSocket(
2227  AsyncServerSocket::newSocket(&eventBase));
2228  serverSocket->setConnectionEventCallback(&connectionEventCallback);
2229  serverSocket->bind(0);
2230  serverSocket->listen(16);
2231  SocketAddress serverAddress;
2232  serverSocket->getAddress(&serverAddress);
2233 
2234  // Add a callback to accept one connection then stop the loop
2235  TestAcceptCallback acceptCallback;
2236  ScopedEventBaseThread cobThread("ioworker_test");
2237  acceptCallback.setConnectionAcceptedFn(
2238  [&](int /* fd */, const SocketAddress& /* addr */) {
2239  eventBase.runInEventBaseThread([&] {
2240  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2241  });
2242  });
2243  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2244  eventBase.runInEventBaseThread(
2245  [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
2246  });
2247  std::atomic<bool> acceptStartedFlag{false};
2248  acceptCallback.setAcceptStartedFn([&]() { acceptStartedFlag = true; });
2249  Baton<> acceptStoppedFlag;
2250  acceptCallback.setAcceptStoppedFn([&]() { acceptStoppedFlag.post(); });
2251  serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
2252  serverSocket->startAccepting();
2253 
2254  // Connect to the server socket
2255  std::shared_ptr<AsyncSocket> socket(
2256  AsyncSocket::newSocket(&eventBase, serverAddress));
2257 
2258  eventBase.loop();
2259 
2260  ASSERT_TRUE(acceptStoppedFlag.try_wait_for(std::chrono::seconds(1)));
2261  ASSERT_TRUE(acceptStartedFlag);
2262  // Validate the connection event counters
2263  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2264  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2265  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2266  ASSERT_EQ(
2267  connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2268  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2269  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2270  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2271  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2272 }
2273 
2277 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2278  EventBase eventBase;
2279 
2280  // Counter of how many connections have been accepted
2281  int count = 0;
2282 
2283  // Create a server socket
2284  auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2285  serverSocket->bind(0);
2286  serverSocket->listen(16);
2287  folly::SocketAddress serverAddress;
2288  serverSocket->getAddress(&serverAddress);
2289 
2290  // Add a callback to accept connections
2291  folly::ScopedEventBaseThread cobThread("ioworker_test");
2292  TestAcceptCallback acceptCallback;
2293  acceptCallback.setConnectionAcceptedFn(
2294  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2295  count++;
2296  eventBase.runInEventBaseThreadAndWait([&] {
2297  ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2298  });
2299  if (count == 4) {
2300  eventBase.runInEventBaseThread([&] {
2301  serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2302  });
2303  }
2304  });
2305  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2306  eventBase.runInEventBaseThread(
2307  [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
2308  });
2309  serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
2310  serverSocket->startAccepting();
2311 
2312  // Connect to the server socket, 4 clients, there are 4 connections
2313  auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2314  auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2315  auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2316  auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2317 
2318  eventBase.loop();
2319  ASSERT_EQ(4, count);
2320 }
2321 
2325 TEST(AsyncSocketTest, BufferTest) {
2326  TestServer server;
2327 
2328  EventBase evb;
2329  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2330  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2331  ConnCallback ccb;
2332  socket->connect(&ccb, server.getAddress(), 30, option);
2333 
2334  char buf[100 * 1024];
2335  memset(buf, 'c', sizeof(buf));
2336  WriteCallback wcb;
2337  BufferCallback bcb;
2338  socket->setBufferCallback(&bcb);
2339  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2340 
2341  evb.loop();
2344 
2345  ASSERT_TRUE(bcb.hasBuffered());
2347 
2348  socket->close();
2349  server.verifyConnection(buf, sizeof(buf));
2350 
2351  ASSERT_TRUE(socket->isClosedBySelf());
2352  ASSERT_FALSE(socket->isClosedByPeer());
2353 }
2354 
2355 TEST(AsyncSocketTest, BufferCallbackKill) {
2356  TestServer server;
2357  EventBase evb;
2358  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2359  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2360  ConnCallback ccb;
2361  socket->connect(&ccb, server.getAddress(), 30, option);
2362  evb.loopOnce();
2363 
2364  char buf[100 * 1024];
2365  memset(buf, 'c', sizeof(buf));
2366  BufferCallback bcb;
2367  socket->setBufferCallback(&bcb);
2368  WriteCallback wcb;
2369  wcb.successCallback = [&] {
2370  ASSERT_TRUE(socket.unique());
2371  socket.reset();
2372  };
2373 
2374  // This will trigger AsyncSocket::handleWrite,
2375  // which calls WriteCallback::writeSuccess,
2376  // which calls wcb.successCallback above,
2377  // which tries to delete socket
2378  // Then, the socket will also try to use this BufferCallback
2379  // And that should crash us, if there is no DestructorGuard on the stack
2380  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2381 
2382  evb.loop();
2384 }
2385 
2386 #if FOLLY_ALLOW_TFO
2387 TEST(AsyncSocketTest, ConnectTFO) {
2388  // Start listening on a local port
2389  TestServer server(true);
2390 
2391  // Connect using a AsyncSocket
2392  EventBase evb;
2393  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2394  socket->enableTFO();
2395  ConnCallback cb;
2396  socket->connect(&cb, server.getAddress(), 30);
2397 
2398  std::array<uint8_t, 128> buf;
2399  memset(buf.data(), 'a', buf.size());
2400 
2401  std::array<uint8_t, 3> readBuf;
2402  auto sendBuf = IOBuf::copyBuffer("hey");
2403 
2404  std::thread t([&] {
2405  auto acceptedSocket = server.accept();
2406  acceptedSocket->write(buf.data(), buf.size());
2407  acceptedSocket->flush();
2408  acceptedSocket->readAll(readBuf.data(), readBuf.size());
2409  acceptedSocket->close();
2410  });
2411 
2412  evb.loop();
2413 
2415  EXPECT_LE(0, socket->getConnectTime().count());
2416  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2417  EXPECT_TRUE(socket->getTFOAttempted());
2418 
2419  // Should trigger the connect
2421  ReadCallback rcb;
2422  socket->writeChain(&write, sendBuf->clone());
2423  socket->setReadCB(&rcb);
2424  evb.loop();
2425 
2426  t.join();
2427 
2428  EXPECT_EQ(STATE_SUCCEEDED, write.state);
2429  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2431  ASSERT_EQ(1, rcb.buffers.size());
2432  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2433  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2434  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2435 }
2436 
2437 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2438  // Start listening on a local port
2439  TestServer server(true);
2440 
2441  // Connect using a AsyncSocket
2442  EventBase evb;
2443  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2444  socket->enableTFO();
2445  ConnCallback cb;
2446  socket->connect(&cb, server.getAddress(), 30);
2447  ReadCallback rcb;
2448  socket->setReadCB(&rcb);
2449 
2450  std::array<uint8_t, 128> buf;
2451  memset(buf.data(), 'a', buf.size());
2452 
2453  std::array<uint8_t, 3> readBuf;
2454  auto sendBuf = IOBuf::copyBuffer("hey");
2455 
2456  std::thread t([&] {
2457  auto acceptedSocket = server.accept();
2458  acceptedSocket->write(buf.data(), buf.size());
2459  acceptedSocket->flush();
2460  acceptedSocket->readAll(readBuf.data(), readBuf.size());
2461  acceptedSocket->close();
2462  });
2463 
2464  evb.loop();
2465 
2467  EXPECT_LE(0, socket->getConnectTime().count());
2468  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2469  EXPECT_TRUE(socket->getTFOAttempted());
2470 
2471  // Should trigger the connect
2473  socket->writeChain(&write, sendBuf->clone());
2474  evb.loop();
2475 
2476  t.join();
2477 
2478  EXPECT_EQ(STATE_SUCCEEDED, write.state);
2479  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2481  ASSERT_EQ(1, rcb.buffers.size());
2482  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2483  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2484  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2485 }
2486 
2490 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2491  EventBase evb;
2492 
2493  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2494 
2495  socket->enableTFO();
2496 
2497  // Hopefully nothing is actually listening on this address
2498  folly::SocketAddress addr("::1", 65535);
2499  ConnCallback cb;
2500  socket->connect(&cb, addr, 30);
2501 
2502  evb.loop();
2503 
2504  WriteCallback write1;
2505  // Trigger the connect if TFO attempt is supported.
2506  socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2507  WriteCallback write2;
2508  socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2509  evb.loop();
2510 
2511  if (!socket->getTFOFinished()) {
2512  EXPECT_EQ(STATE_FAILED, write1.state);
2513  } else {
2514  EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2515  EXPECT_FALSE(socket->getTFOSucceded());
2516  }
2517 
2518  EXPECT_EQ(STATE_FAILED, write2.state);
2519 
2521  EXPECT_LE(0, socket->getConnectTime().count());
2522  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2523  EXPECT_TRUE(socket->getTFOAttempted());
2524 }
2525 
2529 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2530  TestServer server(true);
2531 
2532  // connect()
2533  EventBase evb;
2534  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2535  socket->enableTFO();
2536 
2537  ConnCallback ccb;
2538  socket->connect(&ccb, server.getAddress(), 30);
2539 
2540  // write()
2541  std::array<char, 128> buf;
2542  memset(buf.data(), 'a', buf.size());
2543 
2544  // close()
2545  socket->closeNow();
2546 
2547  // Loop, although there shouldn't be anything to do.
2548  evb.loop();
2549 
2551 
2552  ASSERT_TRUE(socket->isClosedBySelf());
2553  ASSERT_FALSE(socket->isClosedByPeer());
2554 }
2555 
2559 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2560  TestServer server(true);
2561 
2562  // Connect using a AsyncSocket
2563  EventBase evb;
2564  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2565  socket->enableTFO();
2566 
2567  ConnCallback ccb;
2568  socket->connect(&ccb, server.getAddress(), 30);
2569 
2570  socket->close();
2571 
2572  // Loop, although there shouldn't be anything to do.
2573  evb.loop();
2574 
2575  // Make sure the connection was aborted
2577 
2578  ASSERT_TRUE(socket->isClosedBySelf());
2579  ASSERT_FALSE(socket->isClosedByPeer());
2580 }
2581 
2582 class MockAsyncTFOSocket : public AsyncSocket {
2583  public:
2584  using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2585 
2586  explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2587 
2588  MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2589 };
2590 
2591 TEST(AsyncSocketTest, TestTFOUnsupported) {
2592  TestServer server(true);
2593 
2594  // Connect using a AsyncSocket
2595  EventBase evb;
2596  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2597  socket->enableTFO();
2598 
2599  ConnCallback ccb;
2600  socket->connect(&ccb, server.getAddress(), 30);
2602 
2603  ReadCallback rcb;
2604  socket->setReadCB(&rcb);
2605 
2606  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2607  .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2609  auto sendBuf = IOBuf::copyBuffer("hey");
2610  socket->writeChain(&write, sendBuf->clone());
2611  EXPECT_EQ(STATE_WAITING, write.state);
2612 
2613  std::array<uint8_t, 128> buf;
2614  memset(buf.data(), 'a', buf.size());
2615 
2616  std::array<uint8_t, 3> readBuf;
2617 
2618  std::thread t([&] {
2619  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2620  acceptedSocket->write(buf.data(), buf.size());
2621  acceptedSocket->flush();
2622  acceptedSocket->readAll(readBuf.data(), readBuf.size());
2623  acceptedSocket->close();
2624  });
2625 
2626  evb.loop();
2627 
2628  t.join();
2631 
2632  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2634  ASSERT_EQ(1, rcb.buffers.size());
2635  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2636  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2637  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2638 }
2639 
2640 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2641  EventBase evb;
2642 
2643  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2644  socket->enableTFO();
2645 
2646  // Hopefully this fails
2647  folly::SocketAddress fakeAddr("127.0.0.1", 65535);
2648  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2649  .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2650  sockaddr_storage addr;
2651  auto len = fakeAddr.getAddress(&addr);
2652  int ret = connect(fd, (const struct sockaddr*)&addr, len);
2653  LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
2654  << errno;
2655  return ret;
2656  }));
2657 
2658  // Hopefully nothing is actually listening on this address
2659  ConnCallback cb;
2660  socket->connect(&cb, fakeAddr, 30);
2661 
2662  WriteCallback write1;
2663  // Trigger the connect if TFO attempt is supported.
2664  socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2665 
2666  if (socket->getTFOFinished()) {
2667  // This test is useless now.
2668  return;
2669  }
2670  WriteCallback write2;
2671  // Trigger the connect if TFO attempt is supported.
2672  socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2673  evb.loop();
2674 
2675  EXPECT_EQ(STATE_FAILED, write1.state);
2676  EXPECT_EQ(STATE_FAILED, write2.state);
2677  EXPECT_FALSE(socket->getTFOSucceded());
2678 
2680  EXPECT_LE(0, socket->getConnectTime().count());
2681  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2682  EXPECT_TRUE(socket->getTFOAttempted());
2683 }
2684 
2685 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2686  // Try connecting to server that won't respond.
2687  //
2688  // This depends somewhat on the network where this test is run.
2689  // Hopefully this IP will be routable but unresponsive.
2690  // (Alternatively, we could try listening on a local raw socket, but that
2691  // normally requires root privileges.)
2696  : nullptr;
2697  SocketAddress addr(host, 65535);
2698 
2699  // Connect using a AsyncSocket
2700  EventBase evb;
2701  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2702  socket->enableTFO();
2703 
2704  ConnCallback ccb;
2705  // Set a very small timeout
2706  socket->connect(&ccb, addr, 1);
2708 
2709  ReadCallback rcb;
2710  socket->setReadCB(&rcb);
2711 
2712  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2713  .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2715  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2716 
2717  evb.loop();
2718 
2719  EXPECT_EQ(STATE_FAILED, write.state);
2720 }
2721 
2722 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2723  TestServer server(true);
2724 
2725  // Connect using a AsyncSocket
2726  EventBase evb;
2727  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2728  socket->enableTFO();
2729 
2730  ConnCallback ccb;
2731  socket->connect(&ccb, server.getAddress(), 30);
2733 
2734  ReadCallback rcb;
2735  socket->setReadCB(&rcb);
2736 
2737  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2738  .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2739  sockaddr_storage addr;
2740  auto len = server.getAddress().getAddress(&addr);
2741  return connect(fd, (const struct sockaddr*)&addr, len);
2742  }));
2744  auto sendBuf = IOBuf::copyBuffer("hey");
2745  socket->writeChain(&write, sendBuf->clone());
2746  EXPECT_EQ(STATE_WAITING, write.state);
2747 
2748  std::array<uint8_t, 128> buf;
2749  memset(buf.data(), 'a', buf.size());
2750 
2751  std::array<uint8_t, 3> readBuf;
2752 
2753  std::thread t([&] {
2754  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2755  acceptedSocket->write(buf.data(), buf.size());
2756  acceptedSocket->flush();
2757  acceptedSocket->readAll(readBuf.data(), readBuf.size());
2758  acceptedSocket->close();
2759  });
2760 
2761  evb.loop();
2762 
2763  t.join();
2764  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2765 
2768 
2770  ASSERT_EQ(1, rcb.buffers.size());
2771  ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2772  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2773 }
2774 
2775 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2776  // Try connecting to server that won't respond.
2777  //
2778  // This depends somewhat on the network where this test is run.
2779  // Hopefully this IP will be routable but unresponsive.
2780  // (Alternatively, we could try listening on a local raw socket, but that
2781  // normally requires root privileges.)
2786  : nullptr;
2787  SocketAddress addr(host, 65535);
2788 
2789  // Connect using a AsyncSocket
2790  EventBase evb;
2791  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2792  socket->enableTFO();
2793 
2794  ConnCallback ccb;
2795  // Set a very small timeout
2796  socket->connect(&ccb, addr, 1);
2798 
2799  ReadCallback rcb;
2800  socket->setReadCB(&rcb);
2801 
2802  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2803  .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2804  sockaddr_storage addr2;
2805  auto len = addr.getAddress(&addr2);
2806  return connect(fd, (const struct sockaddr*)&addr2, len);
2807  }));
2809  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2810 
2811  evb.loop();
2812 
2813  EXPECT_EQ(STATE_FAILED, write.state);
2814 }
2815 
2816 TEST(AsyncSocketTest, TestTFOEagain) {
2817  TestServer server(true);
2818 
2819  // Connect using a AsyncSocket
2820  EventBase evb;
2821  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2822  socket->enableTFO();
2823 
2824  ConnCallback ccb;
2825  socket->connect(&ccb, server.getAddress(), 30);
2826 
2827  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2828  .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2830  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2831 
2832  evb.loop();
2833 
2835  EXPECT_EQ(STATE_FAILED, write.state);
2836 }
2837 
2838 // Sending a large amount of data in the first write which will
2839 // definitely not fit into MSS.
2840 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2841  // Start listening on a local port
2842  TestServer server(true);
2843 
2844  // Connect using a AsyncSocket
2845  EventBase evb;
2846  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2847  socket->enableTFO();
2848  ConnCallback cb;
2849  socket->connect(&cb, server.getAddress(), 30);
2850 
2851  std::array<uint8_t, 128> buf;
2852  memset(buf.data(), 'a', buf.size());
2853 
2854  constexpr size_t len = 10 * 1024;
2855  auto sendBuf = IOBuf::create(len);
2856  sendBuf->append(len);
2857  std::array<uint8_t, len> readBuf;
2858 
2859  std::thread t([&] {
2860  auto acceptedSocket = server.accept();
2861  acceptedSocket->write(buf.data(), buf.size());
2862  acceptedSocket->flush();
2863  acceptedSocket->readAll(readBuf.data(), readBuf.size());
2864  acceptedSocket->close();
2865  });
2866 
2867  evb.loop();
2868 
2870  EXPECT_LE(0, socket->getConnectTime().count());
2871  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2872  EXPECT_TRUE(socket->getTFOAttempted());
2873 
2874  // Should trigger the connect
2876  ReadCallback rcb;
2877  socket->writeChain(&write, sendBuf->clone());
2878  socket->setReadCB(&rcb);
2879  evb.loop();
2880 
2881  t.join();
2882 
2883  EXPECT_EQ(STATE_SUCCEEDED, write.state);
2884  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2886  ASSERT_EQ(1, rcb.buffers.size());
2887  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2888  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2889  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2890 }
2891 
2892 #endif // FOLLY_ALLOW_TFO
2893 
2895  public:
2896  MOCK_METHOD1(evbAttached, void(AsyncSocket*));
2897  MOCK_METHOD1(evbDetached, void(AsyncSocket*));
2898 };
2899 
2900 TEST(AsyncSocketTest, EvbCallbacks) {
2901  auto cb = std::make_unique<MockEvbChangeCallback>();
2902  EventBase evb;
2903  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2904 
2905  InSequence seq;
2906  EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
2907  EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
2908 
2909  socket->setEvbChangedCallback(std::move(cb));
2910  socket->detachEventBase();
2911  socket->attachEventBase(&evb);
2912 }
2913 
2914 TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
2915  // Start listening on a local port
2916  TestServer server;
2917 
2918  // Connect using a AsyncSocket
2919  EventBase evb;
2920  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2921  ConnCallback cb;
2922  socket->connect(&cb, server.getAddress(), 30);
2923 
2924  evb.loop();
2925 
2927  EXPECT_LE(0, socket->getConnectTime().count());
2928  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2929 
2930  // After the ioHandlers are registered, still should be able to detach/attach
2931  ReadCallback rcb;
2932  socket->setReadCB(&rcb);
2933 
2934  auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
2935  InSequence seq;
2936  EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
2937  EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
2938 
2939  socket->setEvbChangedCallback(std::move(cbEvbChg));
2940  EXPECT_TRUE(socket->isDetachable());
2941  socket->detachEventBase();
2942  socket->attachEventBase(&evb);
2943 
2944  socket->close();
2945 }
2946 
2947 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
2948 /* copied from include/uapi/linux/net_tstamp.h */
2949 /* SO_TIMESTAMPING gets an integer bit field comprised of these values */
2950 enum SOF_TIMESTAMPING {
2951  SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
2952  SOF_TIMESTAMPING_OPT_ID = (1 << 7),
2953  SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
2954  SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
2955  SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
2956 };
2957 
2958 class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
2959  public:
2960  TestErrMessageCallback()
2961  : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
2962 
2963  void errMessage(const cmsghdr& cmsg) noexcept override {
2964  if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
2965  gotTimestamp_++;
2966  checkResetCallback();
2967  } else if (
2968  (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
2969  (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
2970  gotByteSeq_++;
2971  checkResetCallback();
2972  }
2973  }
2974 
2975  void errMessageError(
2976  const folly::AsyncSocketException& ex) noexcept override {
2977  exception_ = ex;
2978  }
2979 
2980  void checkResetCallback() noexcept {
2981  if (socket_ != nullptr && resetAfter_ != -1 &&
2982  gotTimestamp_ + gotByteSeq_ == resetAfter_) {
2983  socket_->setErrMessageCB(nullptr);
2984  }
2985  }
2986 
2987  folly::AsyncSocket* socket_{nullptr};
2988  folly::AsyncSocketException exception_;
2989  int gotTimestamp_{0};
2990  int gotByteSeq_{0};
2991  int resetAfter_{-1};
2992 };
2993 
2994 TEST(AsyncSocketTest, ErrMessageCallback) {
2995  TestServer server;
2996 
2997  // connect()
2998  EventBase evb;
2999  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3000 
3001  ConnCallback ccb;
3002  socket->connect(&ccb, server.getAddress(), 30);
3003  LOG(INFO) << "Client socket fd=" << socket->getFd();
3004 
3005  // Let the socket
3006  evb.loop();
3007 
3009 
3010  // Set read callback to keep the socket subscribed for event
3011  // notifications. Though we're no planning to read anything from
3012  // this side of the connection.
3013  ReadCallback rcb(1);
3014  socket->setReadCB(&rcb);
3015 
3016  // Set up timestamp callbacks
3017  TestErrMessageCallback errMsgCB;
3018  socket->setErrMessageCB(&errMsgCB);
3019  ASSERT_EQ(
3020  socket->getErrMessageCallback(),
3021  static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
3022 
3023  errMsgCB.socket_ = socket.get();
3024  errMsgCB.resetAfter_ = 3;
3025 
3026  // Enable timestamp notifications
3027  ASSERT_GT(socket->getFd(), 0);
3028  int flags = SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY |
3029  SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_CMSG |
3030  SOF_TIMESTAMPING_TX_SCHED;
3031  AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
3032  EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
3033 
3034  // write()
3035  std::vector<uint8_t> wbuf(128, 'a');
3036  WriteCallback wcb;
3037  // Send two packets to get two EOM notifications
3038  socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
3039  socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
3040 
3041  // Accept the connection.
3042  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
3043  LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
3044 
3045  // Loop
3046  evb.loopOnce();
3048 
3049  // Check that we can read the data that was written to the socket
3050  std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
3051  uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
3052  ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
3053  ASSERT_EQ(bytesRead, wbuf.size());
3054 
3055  // Close both sockets
3056  acceptedSocket->close();
3057  socket->close();
3058 
3059  ASSERT_TRUE(socket->isClosedBySelf());
3060  ASSERT_FALSE(socket->isClosedByPeer());
3061 
3062  // Check for the timestamp notifications.
3063  ASSERT_EQ(
3064  errMsgCB.exception_.getType(), folly::AsyncSocketException::UNKNOWN);
3065  ASSERT_GT(errMsgCB.gotByteSeq_, 0);
3066  ASSERT_GT(errMsgCB.gotTimestamp_, 0);
3067  ASSERT_EQ(
3068  errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
3069 }
3070 #endif // FOLLY_HAVE_MSG_ERRQUEUE
3071 
3072 TEST(AsyncSocket, PreReceivedData) {
3073  TestServer server;
3074 
3075  EventBase evb;
3076  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3077  socket->connect(nullptr, server.getAddress(), 30);
3078  evb.loop();
3079 
3080  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3081 
3082  auto acceptedSocket = server.acceptAsync(&evb);
3083 
3084  ReadCallback peekCallback(2);
3085  ReadCallback readCallback;
3086  peekCallback.dataAvailableCallback = [&]() {
3087  peekCallback.verifyData("he", 2);
3088  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
3089  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
3090  acceptedSocket->setReadCB(nullptr);
3091  acceptedSocket->setReadCB(&readCallback);
3092  };
3093  readCallback.dataAvailableCallback = [&]() {
3094  if (readCallback.dataRead() == 5) {
3095  readCallback.verifyData("hello", 5);
3096  acceptedSocket->setReadCB(nullptr);
3097  }
3098  };
3099 
3100  acceptedSocket->setReadCB(&peekCallback);
3101 
3102  evb.loop();
3103 }
3104 
3105 TEST(AsyncSocket, PreReceivedDataOnly) {
3106  TestServer server;
3107 
3108  EventBase evb;
3109  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3110  socket->connect(nullptr, server.getAddress(), 30);
3111  evb.loop();
3112 
3113  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3114 
3115  auto acceptedSocket = server.acceptAsync(&evb);
3116 
3117  ReadCallback peekCallback;
3118  ReadCallback readCallback;
3119  peekCallback.dataAvailableCallback = [&]() {
3120  peekCallback.verifyData("hello", 5);
3121  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3122  acceptedSocket->setReadCB(&readCallback);
3123  };
3124  readCallback.dataAvailableCallback = [&]() {
3125  readCallback.verifyData("hello", 5);
3126  acceptedSocket->setReadCB(nullptr);
3127  };
3128 
3129  acceptedSocket->setReadCB(&peekCallback);
3130 
3131  evb.loop();
3132 }
3133 
3134 TEST(AsyncSocket, PreReceivedDataPartial) {
3135  TestServer server;
3136 
3137  EventBase evb;
3138  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3139  socket->connect(nullptr, server.getAddress(), 30);
3140  evb.loop();
3141 
3142  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3143 
3144  auto acceptedSocket = server.acceptAsync(&evb);
3145 
3146  ReadCallback peekCallback;
3147  ReadCallback smallReadCallback(3);
3148  ReadCallback normalReadCallback;
3149  peekCallback.dataAvailableCallback = [&]() {
3150  peekCallback.verifyData("hello", 5);
3151  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3152  acceptedSocket->setReadCB(&smallReadCallback);
3153  };
3154  smallReadCallback.dataAvailableCallback = [&]() {
3155  smallReadCallback.verifyData("hel", 3);
3156  acceptedSocket->setReadCB(&normalReadCallback);
3157  };
3158  normalReadCallback.dataAvailableCallback = [&]() {
3159  normalReadCallback.verifyData("lo", 2);
3160  acceptedSocket->setReadCB(nullptr);
3161  };
3162 
3163  acceptedSocket->setReadCB(&peekCallback);
3164 
3165  evb.loop();
3166 }
3167 
3168 TEST(AsyncSocket, PreReceivedDataTakeover) {
3169  TestServer server;
3170 
3171  EventBase evb;
3172  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3173  socket->connect(nullptr, server.getAddress(), 30);
3174  evb.loop();
3175 
3176  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3177 
3178  auto acceptedSocket =
3179  AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
3180  AsyncSocket::UniquePtr takeoverSocket;
3181 
3182  ReadCallback peekCallback(3);
3183  ReadCallback readCallback;
3184  peekCallback.dataAvailableCallback = [&]() {
3185  peekCallback.verifyData("hel", 3);
3186  acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3187  acceptedSocket->setReadCB(nullptr);
3188  takeoverSocket =
3189  AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
3190  takeoverSocket->setReadCB(&readCallback);
3191  };
3192  readCallback.dataAvailableCallback = [&]() {
3193  readCallback.verifyData("hello", 5);
3194  takeoverSocket->setReadCB(nullptr);
3195  };
3196 
3197  acceptedSocket->setReadCB(&peekCallback);
3198 
3199  evb.loop();
3200 }
3201 
3202 #ifdef MSG_NOSIGNAL
3203 TEST(AsyncSocketTest, SendMessageFlags) {
3204  TestServer server;
3205  TestSendMsgParamsCallback sendMsgCB(
3206  MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE, 0, nullptr);
3207 
3208  // connect()
3209  EventBase evb;
3210  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3211 
3212  ConnCallback ccb;
3213  socket->connect(&ccb, server.getAddress(), 30);
3214  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
3215 
3216  evb.loop();
3218 
3219  // Set SendMsgParamsCallback
3220  socket->setSendMsgParamCB(&sendMsgCB);
3221  ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
3222 
3223  // Write the first portion of data. This data is expected to be
3224  // sent out immediately.
3225  std::vector<uint8_t> buf(128, 'a');
3226  WriteCallback wcb;
3227  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
3228  socket->write(&wcb, buf.data(), buf.size());
3230  ASSERT_TRUE(sendMsgCB.queriedFlags_);
3231  ASSERT_FALSE(sendMsgCB.queriedData_);
3232 
3233  // Using different flags for the second write operation.
3234  // MSG_MORE flag is expected to delay sending this
3235  // data to the wire.
3236  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
3237  socket->write(&wcb, buf.data(), buf.size());
3239  ASSERT_TRUE(sendMsgCB.queriedFlags_);
3240  ASSERT_FALSE(sendMsgCB.queriedData_);
3241 
3242  // Make sure the accepted socket saw only the data from
3243  // the first write request.
3244  std::vector<uint8_t> readbuf(2 * buf.size());
3245  uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
3246  ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
3247  ASSERT_EQ(bytesRead, buf.size());
3248 
3249  // Make sure the server got a connection and received the data
3250  acceptedSocket->close();
3251  socket->close();
3252 
3253  ASSERT_TRUE(socket->isClosedBySelf());
3254  ASSERT_FALSE(socket->isClosedByPeer());
3255 }
3256 
3257 TEST(AsyncSocketTest, SendMessageAncillaryData) {
3258  int fds[2];
3259  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
3260 
3261  // "Client" socket
3262  int cfd = fds[0];
3263  ASSERT_NE(cfd, -1);
3264 
3265  // "Server" socket
3266  int sfd = fds[1];
3267  ASSERT_NE(sfd, -1);
3268  SCOPE_EXIT {
3269  close(sfd);
3270  };
3271 
3272  // Instantiate AsyncSocket object for the connected socket
3273  EventBase evb;
3274  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
3275 
3276  // Open a temporary file and write a magic string to it
3277  // We'll transfer the file handle to test the message parameters
3278  // callback logic.
3279  TemporaryFile file(
3281  int tmpfd = file.fd();
3282  ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
3283  std::string magicString("Magic string");
3284  ASSERT_EQ(
3285  write(tmpfd, magicString.c_str(), magicString.length()),
3286  magicString.length());
3287 
3288  // Send message
3289  union {
3290  // Space large enough to hold an 'int'
3291  char control[CMSG_SPACE(sizeof(int))];
3292  struct cmsghdr cmh;
3293  } s_u;
3294  s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
3295  s_u.cmh.cmsg_level = SOL_SOCKET;
3296  s_u.cmh.cmsg_type = SCM_RIGHTS;
3297  memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
3298 
3299  // Set up the callback providing message parameters
3300  TestSendMsgParamsCallback sendMsgCB(
3301  MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
3302  socket->setSendMsgParamCB(&sendMsgCB);
3303 
3304  // We must transmit at least 1 byte of real data in order
3305  // to send ancillary data
3306  int s_data = 12345;
3307  WriteCallback wcb;
3308  socket->write(&wcb, &s_data, sizeof(s_data));
3310 
3311  // Receive the message
3312  union {
3313  // Space large enough to hold an 'int'
3314  char control[CMSG_SPACE(sizeof(int))];
3315  struct cmsghdr cmh;
3316  } r_u;
3317  struct msghdr msgh;
3318  struct iovec iov;
3319  int r_data = 0;
3320 
3321  msgh.msg_control = r_u.control;
3322  msgh.msg_controllen = sizeof(r_u.control);
3323  msgh.msg_name = nullptr;
3324  msgh.msg_namelen = 0;
3325  msgh.msg_iov = &iov;
3326  msgh.msg_iovlen = 1;
3327  iov.iov_base = &r_data;
3328  iov.iov_len = sizeof(r_data);
3329 
3330  // Receive data
3331  ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
3332 
3333  // Validate the received message
3334  ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
3335  ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
3336  ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
3337  ASSERT_EQ(r_data, s_data);
3338  int fd = 0;
3339  memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
3340  ASSERT_NE(fd, 0);
3341  SCOPE_EXIT {
3342  close(fd);
3343  };
3344 
3345  std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
3346 
3347  // Reposition to the beginning of the file
3348  ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
3349 
3350  // Read the magic string back, and compare it with the original
3351  ASSERT_EQ(
3352  magicString.length(),
3353  read(fd, transferredMagicString.data(), transferredMagicString.size()));
3354  ASSERT_TRUE(std::equal(
3355  magicString.begin(), magicString.end(), transferredMagicString.begin()));
3356 }
3357 
3358 TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
3359  // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
3360  // Socket (UDS) does not support MSG_ERRQUEUE. So
3361  // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
3362  // breaks application message flow. To avoid this problem,
3363  // AsyncSocket currently disables setErrMessageCB for UDS.
3364  //
3365  // This tests two things for UDS
3366  // 1. setErrMessageCB fails
3367  // 2. recvmsg(MSG_ERRQUEUE) reads application data
3368  //
3369  // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
3370 
3371  int fd[2];
3372  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
3373  ASSERT_NE(fd[0], -1);
3374  ASSERT_NE(fd[1], -1);
3375  SCOPE_EXIT {
3376  close(fd[1]);
3377  };
3378 
3379  EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
3380  EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
3381 
3382  EventBase evb;
3383  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
3384 
3385  // setErrMessageCB should fail for unix domain socket
3386  TestErrMessageCallback errMsgCB;
3387  ASSERT_NE(&errMsgCB, nullptr);
3388  socket->setErrMessageCB(&errMsgCB);
3389  ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
3390 
3391 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
3392  // The following verifies that MSG_ERRQUEUE does not work for UDS,
3393  // and recvmsg reads application data
3394  union {
3395  // Space large enough to hold an 'int'
3396  char control[CMSG_SPACE(sizeof(int))];
3397  struct cmsghdr cmh;
3398  } r_u;
3399  struct msghdr msgh;
3400  struct iovec iov;
3401  int recv_data = 0;
3402 
3403  msgh.msg_control = r_u.control;
3404  msgh.msg_controllen = sizeof(r_u.control);
3405  msgh.msg_name = nullptr;
3406  msgh.msg_namelen = 0;
3407  msgh.msg_iov = &iov;
3408  msgh.msg_iovlen = 1;
3409  iov.iov_base = &recv_data;
3410  iov.iov_len = sizeof(recv_data);
3411 
3412  // there is no data, recvmsg should fail
3413  EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
3414  EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
3415 
3416  // provide some application data, error queue should be empty if it exists
3417  // However, UDS reads application data as error message
3418  int test_data = 123456;
3419  WriteCallback wcb;
3420  socket->write(&wcb, &test_data, sizeof(test_data));
3421  recv_data = 0;
3422  ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
3423  ASSERT_EQ(recv_data, test_data);
3424 #endif // FOLLY_HAVE_MSG_ERRQUEUE
3425 }
3426 
3427 TEST(AsyncSocketTest, V6TosReflectTest) {
3428  EventBase eventBase;
3429 
3430  // Create a server socket
3431  std::shared_ptr<AsyncServerSocket> serverSocket(
3432  AsyncServerSocket::newSocket(&eventBase));
3433  folly::IPAddress ip("::1");
3434  std::vector<folly::IPAddress> serverIp;
3435  serverIp.push_back(ip);
3436  serverSocket->bind(serverIp, 0);
3437  serverSocket->listen(16);
3438  folly::SocketAddress serverAddress;
3439  serverSocket->getAddress(&serverAddress);
3440 
3441  // Enable TOS reflect
3442  serverSocket->setTosReflect(true);
3443 
3444  // Add a callback to accept one connection then stop the loop
3445  TestAcceptCallback acceptCallback;
3446  acceptCallback.setConnectionAcceptedFn(
3447  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
3448  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3449  });
3450  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
3451  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3452  });
3453  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
3454  serverSocket->startAccepting();
3455 
3456  // Create a client socket, setsockopt() the TOS before connecting
3457  auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
3458  ConnCallback* ccb,
3459  EventBase* evb,
3460  folly::SocketAddress sAddr) {
3461  clientSock = AsyncSocket::newSocket(evb);
3462  AsyncSocket::OptionKey v6Opts = {IPPROTO_IPV6, IPV6_TCLASS};
3463  AsyncSocket::OptionMap optionMap;
3464  optionMap.insert({v6Opts, 0x2c});
3465  SocketAddress bindAddr("0.0.0.0", 0);
3466  clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
3467  };
3468 
3469  std::shared_ptr<AsyncSocket> socket(nullptr);
3470  ConnCallback cb;
3471  clientThread(socket, &cb, &eventBase, serverAddress);
3472 
3473  eventBase.loop();
3474 
3475  // Verify if the connection is accepted and if the accepted socket has
3476  // setsockopt on the TOS for the same value that was on the client socket
3477  int fd = acceptCallback.getEvents()->at(1).fd;
3478  ASSERT_GE(fd, 0);
3479  int value;
3480  socklen_t valueLength = sizeof(value);
3481  int rc = getsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &value, &valueLength);
3482  ASSERT_EQ(rc, 0);
3483  ASSERT_EQ(value, 0x2c);
3484 }
3485 
3486 TEST(AsyncSocketTest, V4TosReflectTest) {
3487  EventBase eventBase;
3488 
3489  // Create a server socket
3490  std::shared_ptr<AsyncServerSocket> serverSocket(
3491  AsyncServerSocket::newSocket(&eventBase));
3492  folly::IPAddress ip("127.0.0.1");
3493  std::vector<folly::IPAddress> serverIp;
3494  serverIp.push_back(ip);
3495  serverSocket->bind(serverIp, 0);
3496  serverSocket->listen(16);
3497  folly::SocketAddress serverAddress;
3498  serverSocket->getAddress(&serverAddress);
3499 
3500  // Enable TOS reflect
3501  serverSocket->setTosReflect(true);
3502 
3503  // Add a callback to accept one connection then stop the loop
3504  TestAcceptCallback acceptCallback;
3505  acceptCallback.setConnectionAcceptedFn(
3506  [&](int /* fd */, const folly::SocketAddress& /* addr */) {
3507  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3508  });
3509  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
3510  serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
3511  });
3512  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
3513  serverSocket->startAccepting();
3514 
3515  // Create a client socket, setsockopt() the TOS before connecting
3516  auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
3517  ConnCallback* ccb,
3518  EventBase* evb,
3519  folly::SocketAddress sAddr) {
3520  clientSock = AsyncSocket::newSocket(evb);
3521  AsyncSocket::OptionKey v4Opts = {IPPROTO_IP, IP_TOS};
3522  AsyncSocket::OptionMap optionMap;
3523  optionMap.insert({v4Opts, 0x2c});
3524  SocketAddress bindAddr("0.0.0.0", 0);
3525  clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
3526  };
3527 
3528  std::shared_ptr<AsyncSocket> socket(nullptr);
3529  ConnCallback cb;
3530  clientThread(socket, &cb, &eventBase, serverAddress);
3531 
3532  eventBase.loop();
3533 
3534  // Verify if the connection is accepted and if the accepted socket has
3535  // setsockopt on the TOS for the same value that was on the client socket
3536  int fd = acceptCallback.getEvents()->at(1).fd;
3537  ASSERT_GE(fd, 0);
3538  int value;
3539  socklen_t valueLength = sizeof(value);
3540  int rc = getsockopt(fd, IPPROTO_IP, IP_TOS, &value, &valueLength);
3541  ASSERT_EQ(rc, 0);
3542  ASSERT_EQ(value, 0x2c);
3543 }
3544 #endif
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_GE(val1, val2)
Definition: gtest.h:1972
#define ASSERT_GT(val1, val2)
Definition: gtest.h:1976
void setFromPath(StringPiece path)
flags
Definition: http_parser.h:127
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
Definition: Types-inl.h:220
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
void setConnectionAcceptedFn(const std::function< void(int, const folly::SocketAddress &)> &fn)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
void timeoutExpired() noexceptoverride
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
folly::AsyncSocketException exception
EventBase * getEventBase() const override
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
virtual bool isClosedByPeer() const
Definition: AsyncSocket.h:571
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::shared_ptr< BlockingSocket > accept(int timeout=50)
folly::AsyncSocketException exception
std::deque< EventInfo > * getEvents()
void appendChain(std::unique_ptr< IOBuf > &&iobuf)
Definition: IOBuf.h:827
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
std::vector< TFOState > getTestingValues()
STL namespace.
#define ASSERT_LE(val1, val2)
Definition: gtest.h:1964
INSTANTIATE_TEST_CASE_P(ConnectTests, AsyncSocketConnectTest,::testing::ValuesIn(getTestingValues()))
std::vector< Buffer > buffers
Gen seq(Value first, Value last)
Definition: Base.h:484
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
unsigned int getConnectionAcceptedError() const
bool hasBuffered() const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::unique_ptr< IOBuf > clone() const
Definition: IOBuf.cpp:527
auto msvcSuppressAbortOnInvalidParams(Func func) -> decltype(func())
Definition: TestUtil.h:166
requires E e noexcept(noexcept(s.error(std::move(e))))
#define SKIP()
Definition: TestUtils.h:55
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
void getAddress(SocketAddress *addressReturn) const override
DelayedWrite(const std::shared_ptr< AsyncSocket > &socket, unique_ptr< IOBuf > &&bufs, AsyncTransportWrapper::WriteCallback *wcb, bool cork, bool lastWrite=false)
std::shared_ptr< AsyncSocket > socket_
void testConnectOptWrite(size_t size1, size_t size2, bool close=false)
void setPort(uint16_t port)
AsyncTransportWrapper::WriteCallback * wcb_
unique_ptr< IOBuf > bufs_
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
#define MOCK_METHOD3(m,...)
VoidCallback successCallback
FOLLY_ALWAYS_INLINE bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout, const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:206
virtual void handleRead() noexcept
void setMaxReadsPerEvent(uint16_t maxReads)
Definition: AsyncSocket.h:451
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
VoidCallback successCallback
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
static constexpr const char * kGooglePublicDnsAAddrIPv6
LogLevel min
Definition: LogLevel.cpp:30
StateEnum state
bool hasBufferCleared() const
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
uint8_t * writableData()
Definition: IOBuf.h:509
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
void setAcceptStartedFn(const std::function< void()> &fn)
void terminateLoopSoon()
Definition: EventBase.cpp:493
AsyncServerSocket::UniquePtr socket_
TEST_P(AsyncSocketConnectTest, ConnectAndWrite)
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
void checkForImmediateRead() noexceptoverride
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
void shutdown(Counter &)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
int acceptFD(int timeout=50)
StateEnum state
void tmpDisableReads(AsyncSocket *socket, ReadCallback *rcb)
auto start
void post() noexcept
Definition: Baton.h:123
TEST(ProgramOptionsTest, Errors)
int * count
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
Definition: NetOps.cpp:268
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
Definition: AsyncSocket.h:281
void setReadCB(ReadCallback *callback) override
#define MOCK_METHOD1(m,...)
const folly::SocketAddress & getAddress() const
void setFromLocalAddress(int socket)
void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase)
const char * string
Definition: Conv.cpp:212
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static constexpr const char * kGooglePublicDnsAAddrIPv4
void close() override
EventBase * getEventBase() const override
Definition: AsyncSocket.h:328
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool scheduleTimeout(uint32_t milliseconds)
unsigned int getConnectionEnqueuedForAcceptCallback() const
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define ASSERT_NE(val1, val2)
Definition: gtest.h:1960
void serverSocketSanityTest(AsyncServerSocket *serverSocket)
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
virtual bool isClosedBySelf() const
Definition: AsyncSocket.h:577
PolymorphicAction< internal::SetErrnoAndReturnAction< T > > SetErrnoAndReturn(int errval, T result)
Range< const char * > StringPiece
static uint64_t rand64()
Definition: Random.h:263
AsyncSocketExceptionType getType() const noexcept
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
void setAcceptErrorFn(const std::function< void(const std::exception &)> &fn)
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
int close(NetworkSocket s)
Definition: NetOps.cpp:90
ThreadPoolListHook * addr
void verifyConnection(const char *buf, size_t len)
int apply(int fd, int val) const
Definition: AsyncSocket.h:368
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
#define T_CHECK_TIMEOUT(start, end, expectedMS,...)
Definition: Util.h:38
option(BUILD_SHARED_LIBS"Build shared libraries (DLLs)."OFF) option(gmock_build_tests"Build all of Google Mock's own tests."OFF) if(EXISTS"$
Definition: CMakeLists.txt:10
void setAcceptStoppedFn(const std::function< void()> &fn)
AsyncSocketImmediateRead(folly::EventBase *evb)
unsigned int getConnectionDequeuedByAcceptCallback() const
void append(std::size_t amount)
Definition: IOBuf.h:689
int socketpair(int domain, int type, int protocol, NetworkSocket sv[2])
Definition: NetOps.cpp:416
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)
std::atomic< size_t > bytesWritten