proxygen
AsyncSocket.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/ExceptionWrapper.h>
19 #include <folly/Format.h>
20 #include <folly/Portability.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/String.h>
23 #include <folly/io/Cursor.h>
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/IOBufQueue.h>
30 
31 #include <boost/preprocessor/control/if.hpp>
32 #include <errno.h>
33 #include <limits.h>
34 #include <sys/types.h>
35 #include <thread>
36 
37 #if FOLLY_HAVE_VLA
38 #define FOLLY_HAVE_VLA_01 1
39 #else
40 #define FOLLY_HAVE_VLA_01 0
41 #endif
42 
43 using std::string;
44 using std::unique_ptr;
45 
47 
48 namespace folly {
49 
50 static constexpr bool msgErrQueueSupported =
51 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
52  true;
53 #else
54  false;
55 #endif // FOLLY_HAVE_MSG_ERRQUEUE
56 
57 // static members initializers
59 
62  "socket closed locally");
65  "socket shutdown for writes");
66 
67 // TODO: It might help performance to provide a version of BytesWriteRequest
68 // that users could derive from, so we can avoid the extra allocation for each
69 // call to write()/writev().
70 //
71 // We would need the version for external users where they provide the iovec
72 // storage space, and only our internal version would allocate it at the end of
73 // the WriteRequest.
74 
75 /* The default WriteRequest implementation, used for write(), writev() and
76  * writeChain()
77  *
78  * A new BytesWriteRequest operation is allocated on the heap for all write
79  * operations that cannot be completed immediately.
80  */
82  public:
85  WriteCallback* callback,
86  const iovec* ops,
87  uint32_t opCount,
88  uint32_t partialWritten,
90  unique_ptr<IOBuf>&& ioBuf,
91  WriteFlags flags) {
92  assert(opCount > 0);
93  // Since we put a variable size iovec array at the end
94  // of each BytesWriteRequest, we have to manually allocate the memory.
95  void* buf =
96  malloc(sizeof(BytesWriteRequest) + (opCount * sizeof(struct iovec)));
97  if (buf == nullptr) {
98  throw std::bad_alloc();
99  }
100 
101  return new (buf) BytesWriteRequest(
102  socket,
103  callback,
104  ops,
105  opCount,
106  partialWritten,
107  bytesWritten,
108  std::move(ioBuf),
109  flags);
110  }
111 
112  void destroy() override {
113  this->~BytesWriteRequest();
114  free(this);
115  }
116 
118  WriteFlags writeFlags = flags_;
119  if (getNext() != nullptr) {
120  writeFlags |= WriteFlags::CORK;
121  }
122 
123  socket_->adjustZeroCopyFlags(writeFlags);
124 
125  auto writeResult = socket_->performWrite(
126  getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
127  bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
128  if (bytesWritten_) {
129  if (socket_->isZeroCopyRequest(writeFlags)) {
130  if (isComplete()) {
132  } else {
133  socket_->addZeroCopyBuf(ioBuf_.get());
134  }
135  } else {
136  // this happens if at least one of the prev requests were sent
137  // with zero copy but not the last one
138  if (isComplete() && socket_->getZeroCopy() &&
141  }
142  }
143  }
144  return writeResult;
145  }
146 
147  bool isComplete() override {
148  return opsWritten_ == getOpCount();
149  }
150 
151  void consume() override {
152  // Advance opIndex_ forward by opsWritten_
154  assert(opIndex_ < opCount_);
155 
157  // If we've finished writing any IOBufs, release them
158  if (ioBuf_) {
159  for (uint32_t i = opsWritten_; i != 0; --i) {
160  assert(ioBuf_);
161  ioBuf_ = ioBuf_->pop();
162  }
163  }
164  }
165 
166  // Move partialBytes_ forward into the current iovec buffer
167  struct iovec* currentOp = writeOps_ + opIndex_;
168  assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
169  currentOp->iov_base =
170  reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
171  currentOp->iov_len -= partialBytes_;
172 
173  // Increment the totalBytesWritten_ count by bytesWritten_;
174  assert(bytesWritten_ >= 0);
176  }
177 
178  private:
181  WriteCallback* callback,
182  const struct iovec* ops,
183  uint32_t opCount,
184  uint32_t partialBytes,
186  unique_ptr<IOBuf>&& ioBuf,
188  : AsyncSocket::WriteRequest(socket, callback),
189  opCount_(opCount),
190  opIndex_(0),
191  flags_(flags),
192  ioBuf_(std::move(ioBuf)),
193  opsWritten_(0),
194  partialBytes_(partialBytes),
195  bytesWritten_(bytesWritten) {
196  memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
197  }
198 
199  // private destructor, to ensure callers use destroy()
200  ~BytesWriteRequest() override = default;
201 
202  const struct iovec* getOps() const {
203  assert(opCount_ > opIndex_);
204  return writeOps_ + opIndex_;
205  }
206 
208  assert(opCount_ > opIndex_);
209  return opCount_ - opIndex_;
210  }
211 
215  unique_ptr<IOBuf> ioBuf_;
216 
217  // for consume(), how much we wrote on the last write
220  ssize_t bytesWritten_;
221 
222  struct iovec writeOps_[];
223 };
224 
227  bool zeroCopyEnabled) noexcept {
228  int msg_flags = MSG_DONTWAIT;
229 
230 #ifdef MSG_NOSIGNAL // Linux-only
231  msg_flags |= MSG_NOSIGNAL;
232 #ifdef MSG_MORE
233  if (isSet(flags, WriteFlags::CORK)) {
234  // MSG_MORE tells the kernel we have more data to send, so wait for us to
235  // give it the rest of the data rather than immediately sending a partial
236  // frame, even when TCP_NODELAY is enabled.
237  msg_flags |= MSG_MORE;
238  }
239 #endif // MSG_MORE
240 #endif // MSG_NOSIGNAL
241  if (isSet(flags, WriteFlags::EOR)) {
242  // marks that this is the last byte of a record (response)
243  msg_flags |= MSG_EOR;
244  }
245 
246  if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
247  msg_flags |= MSG_ZEROCOPY;
248  }
249 
250  return msg_flags;
251 }
252 
253 namespace {
254 static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
255 
256 // Based on flags, signal the transparent handler to disable certain functions
257 void disableTransparentFunctions(int fd, bool noTransparentTls, bool noTSocks) {
258  (void)fd;
259  (void)noTransparentTls;
260  (void)noTSocks;
261 #if __linux__
262  if (noTransparentTls) {
263  // Ignore return value, errors are ok
264  VLOG(5) << "Disabling TTLS for fd " << fd;
265  ::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
266  }
267  if (noTSocks) {
268  VLOG(5) << "Disabling TSOCKS for fd " << fd;
269  // Ignore return value, errors are ok
270  ::setsockopt(fd, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
271  }
272 #endif
273 }
274 
275 } // namespace
276 
278  : eventBase_(nullptr),
279  writeTimeout_(this, nullptr),
280  ioHandler_(this, nullptr),
281  immediateReadHandler_(this) {
282  VLOG(5) << "new AsyncSocket()";
283  init();
284 }
285 
287  : eventBase_(evb),
288  writeTimeout_(this, evb),
289  ioHandler_(this, evb),
290  immediateReadHandler_(this) {
291  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
292  init();
293 }
294 
296  EventBase* evb,
297  const folly::SocketAddress& address,
298  uint32_t connectTimeout)
299  : AsyncSocket(evb) {
300  connect(nullptr, address, connectTimeout);
301 }
302 
304  EventBase* evb,
305  const std::string& ip,
306  uint16_t port,
307  uint32_t connectTimeout)
308  : AsyncSocket(evb) {
309  connect(nullptr, ip, port, connectTimeout);
310 }
311 
312 AsyncSocket::AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId)
313  : zeroCopyBufId_(zeroCopyBufId),
314  eventBase_(evb),
315  writeTimeout_(this, evb),
316  ioHandler_(this, evb, fd),
317  immediateReadHandler_(this) {
318  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" << fd
319  << ", zeroCopyBufId=" << zeroCopyBufId << ")";
320  init();
321  fd_ = fd;
322  disableTransparentFunctions(fd_, noTransparentTls_, noTSocks_);
323  setCloseOnExec();
325 }
326 
328  : AsyncSocket(
329  oldAsyncSocket->getEventBase(),
330  oldAsyncSocket->detachFd(),
331  oldAsyncSocket->getZeroCopyBufId()) {
332  preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
333 }
334 
335 // init() method, since constructor forwarding isn't supported in most
336 // compilers yet.
338  if (eventBase_) {
340  }
341  shutdownFlags_ = 0;
344  fd_ = -1;
345  sendTimeout_ = 0;
346  maxReadsPerEvent_ = 16;
347  connectCallback_ = nullptr;
348  errMessageCallback_ = nullptr;
349  readCallback_ = nullptr;
350  writeReqHead_ = nullptr;
351  writeReqTail_ = nullptr;
352  wShutdownSocketSet_.reset();
353  appBytesWritten_ = 0;
354  appBytesReceived_ = 0;
355  sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
356 }
357 
359  VLOG(7) << "actual destruction of AsyncSocket(this=" << this
360  << ", evb=" << eventBase_ << ", fd=" << fd_ << ", state=" << state_
361  << ")";
362 }
363 
365  VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
366  << ", fd=" << fd_ << ", state=" << state_;
367  // When destroy is called, close the socket immediately
368  closeNow();
369 
370  // Then call DelayedDestruction::destroy() to take care of
371  // whether or not we need immediate or delayed destruction
373 }
374 
376  VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
377  << ", evb=" << eventBase_ << ", state=" << state_
378  << ", events=" << std::hex << eventFlags_ << ")";
379  // Extract the fd, and set fd_ to -1 first, so closeNow() won't
380  // actually close the descriptor.
381  if (const auto socketSet = wShutdownSocketSet_.lock()) {
382  socketSet->remove(fd_);
383  }
384  int fd = fd_;
385  fd_ = -1;
386  // Call closeNow() to invoke all pending callbacks with an error.
387  closeNow();
388  // Update the EventHandler to stop using this fd.
389  // This can only be done after closeNow() unregisters the handler.
391  return fd;
392 }
393 
395  static const folly::SocketAddress anyAddress =
396  folly::SocketAddress("0.0.0.0", 0);
397  return anyAddress;
398 }
399 
401  const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
402  const auto newSS = wNewSS.lock();
403  const auto shutdownSocketSet = wShutdownSocketSet_.lock();
404 
405  if (newSS == shutdownSocketSet) {
406  return;
407  }
408 
409  if (shutdownSocketSet && fd_ != -1) {
411  }
412 
413  if (newSS && fd_ != -1) {
414  newSS->add(fd_);
415  }
416 
417  wShutdownSocketSet_ = wNewSS;
418 }
419 
421  int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
422  if (rv != 0) {
423  auto errnoCopy = errno;
424  throw AsyncSocketException(
426  withAddr("failed to set close-on-exec flag"),
427  errnoCopy);
428  }
429 }
430 
432  ConnectCallback* callback,
433  const folly::SocketAddress& address,
434  int timeout,
435  const OptionMap& options,
436  const folly::SocketAddress& bindAddr) noexcept {
437  DestructorGuard dg(this);
439 
440  addr_ = address;
441 
442  // Make sure we're in the uninitialized state
443  if (state_ != StateEnum::UNINIT) {
444  return invalidState(callback);
445  }
446 
447  connectTimeout_ = std::chrono::milliseconds(timeout);
449  // Make connect end time at least >= connectStartTime.
451 
452  assert(fd_ == -1);
454  connectCallback_ = callback;
455 
456  sockaddr_storage addrStorage;
457  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
458 
459  try {
460  // Create the socket
461  // Technically the first parameter should actually be a protocol family
462  // constant (PF_xxx) rather than an address family (AF_xxx), but the
463  // distinction is mainly just historical. In pretty much all
464  // implementations the PF_foo and AF_foo constants are identical.
465  fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
466  if (fd_ < 0) {
467  auto errnoCopy = errno;
468  throw AsyncSocketException(
470  withAddr("failed to create socket"),
471  errnoCopy);
472  }
473  disableTransparentFunctions(fd_, noTransparentTls_, noTSocks_);
474  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
476  }
478 
479  setCloseOnExec();
480 
481  // Put the socket in non-blocking mode
482  int flags = fcntl(fd_, F_GETFL, 0);
483  if (flags == -1) {
484  auto errnoCopy = errno;
485  throw AsyncSocketException(
487  withAddr("failed to get socket flags"),
488  errnoCopy);
489  }
490  int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
491  if (rv == -1) {
492  auto errnoCopy = errno;
493  throw AsyncSocketException(
495  withAddr("failed to put socket in non-blocking mode"),
496  errnoCopy);
497  }
498 
499 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
500  // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
501  rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
502  if (rv == -1) {
503  auto errnoCopy = errno;
504  throw AsyncSocketException(
506  "failed to enable F_SETNOSIGPIPE on socket",
507  errnoCopy);
508  }
509 #endif
510 
511  // By default, turn on TCP_NODELAY
512  // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
513  // setNoDelay() will log an error message if it fails.
514  // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
515  // is not created
516  if (address.getFamily() != AF_UNIX) {
517  (void)setNoDelay(true);
519  }
520 
521  VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
522  << ", fd=" << fd_ << ", host=" << address.describe().c_str();
523 
524  // bind the socket
525  if (bindAddr != anyAddress()) {
526  int one = 1;
527  if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
528  auto errnoCopy = errno;
529  doClose();
530  throw AsyncSocketException(
532  "failed to setsockopt prior to bind on " + bindAddr.describe(),
533  errnoCopy);
534  }
535 
536  bindAddr.getAddress(&addrStorage);
537 
538  if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
539  auto errnoCopy = errno;
540  doClose();
541  throw AsyncSocketException(
543  "failed to bind to async socket: " + bindAddr.describe(),
544  errnoCopy);
545  }
546  }
547 
548  // Apply the additional options if any.
549  for (const auto& opt : options) {
550  rv = opt.first.apply(fd_, opt.second);
551  if (rv != 0) {
552  auto errnoCopy = errno;
553  throw AsyncSocketException(
555  withAddr("failed to set socket option"),
556  errnoCopy);
557  }
558  }
559 
560  // Perform the connect()
561  address.getAddress(&addrStorage);
562 
563  if (tfoEnabled_) {
565  tfoAttempted_ = true;
566  } else {
567  if (socketConnect(saddr, addr_.getActualSize()) < 0) {
568  return;
569  }
570  }
571 
572  // If we're still here the connect() succeeded immediately.
573  // Fall through to call the callback outside of this try...catch block
574  } catch (const AsyncSocketException& ex) {
575  return failConnect(__func__, ex);
576  } catch (const std::exception& ex) {
577  // shouldn't happen, but handle it just in case
578  VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
579  << "): unexpected " << typeid(ex).name()
580  << " exception: " << ex.what();
583  withAddr(string("unexpected exception: ") + ex.what()));
584  return failConnect(__func__, tex);
585  }
586 
587  // The connection succeeded immediately
588  // The read callback may not have been set yet, and no writes may be pending
589  // yet, so we don't have to register for any events at the moment.
590  VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
591  assert(errMessageCallback_ == nullptr);
592  assert(readCallback_ == nullptr);
593  assert(writeReqHead_ == nullptr);
594  if (state_ != StateEnum::FAST_OPEN) {
596  }
598 }
599 
600 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
601  int rv = fsp::connect(fd_, saddr, len);
602  if (rv < 0) {
603  auto errnoCopy = errno;
604  if (errnoCopy == EINPROGRESS) {
607  } else {
608  throw AsyncSocketException(
610  "connect failed (immediately)",
611  errnoCopy);
612  }
613  }
614  return rv;
615 }
616 
618  // Connection in progress.
619  auto timeout = connectTimeout_.count();
620  if (timeout > 0) {
621  // Start a timer in case the connection takes too long.
623  throw AsyncSocketException(
625  withAddr("failed to schedule AsyncSocket connect timeout"));
626  }
627  }
628 }
629 
631  // Register for write events, so we'll
632  // be notified when the connection finishes/fails.
633  // Note that we don't register for a persistent event here.
634  assert(eventFlags_ == EventHandler::NONE);
637  throw AsyncSocketException(
639  withAddr("failed to register AsyncSocket connect handler"));
640  }
641 }
642 
644  ConnectCallback* callback,
645  const string& ip,
646  uint16_t port,
647  int timeout,
648  const OptionMap& options) noexcept {
649  DestructorGuard dg(this);
650  try {
651  connectCallback_ = callback;
652  connect(callback, folly::SocketAddress(ip, port), timeout, options);
653  } catch (const std::exception& ex) {
655  return failConnect(__func__, tex);
656  }
657 }
658 
660  connectCallback_ = nullptr;
662  closeNow();
663  }
664 }
665 
667  sendTimeout_ = milliseconds;
668  if (eventBase_) {
670  }
671 
672  // If we are currently pending on write requests, immediately update
673  // writeTimeout_ with the new value.
676  assert(state_ == StateEnum::ESTABLISHED);
677  assert((shutdownFlags_ & SHUT_WRITE) == 0);
678  if (sendTimeout_ > 0) {
682  withAddr("failed to reschedule send timeout in setSendTimeout"));
683  return failWrite(__func__, ex);
684  }
685  } else {
687  }
688  }
689 }
690 
692  VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this << ", fd=" << fd_
693  << ", callback=" << callback << ", state=" << state_;
694 
695  // In the latest stable kernel 4.14.3 as of 2017-12-04, unix domain
696  // socket does not support MSG_ERRQUEUE. So recvmsg(MSG_ERRQUEUE)
697  // will read application data from unix doamin socket as error
698  // message, which breaks the message flow in application. Feel free
699  // to remove the next code block if MSG_ERRQUEUE is added for unix
700  // domain socket in the future.
701  if (callback != nullptr) {
703  if (localAddr_.getFamily() == AF_UNIX) {
704  LOG(ERROR) << "Failed to set ErrMessageCallback=" << callback
705  << " for Unix Doamin Socket where MSG_ERRQUEUE is unsupported,"
706  << " fd=" << fd_;
707  return;
708  }
709  }
710 
711  // Short circuit if callback is the same as the existing errMessageCallback_.
712  if (callback == errMessageCallback_) {
713  return;
714  }
715 
716  if (!msgErrQueueSupported) {
717  // Per-socket error message queue is not supported on this platform.
718  return invalidState(callback);
719  }
720 
721  DestructorGuard dg(this);
723 
724  if (callback == nullptr) {
725  // We should be able to reset the callback regardless of the
726  // socket state. It's important to have a reliable callback
727  // cancellation mechanism.
728  errMessageCallback_ = callback;
729  return;
730  }
731 
732  switch ((StateEnum)state_) {
735  case StateEnum::ESTABLISHED: {
736  errMessageCallback_ = callback;
737  return;
738  }
739  case StateEnum::CLOSED:
740  case StateEnum::ERROR:
741  // We should never reach here. SHUT_READ should always be set
742  // if we are in STATE_CLOSED or STATE_ERROR.
743  assert(false);
744  return invalidState(callback);
745  case StateEnum::UNINIT:
746  // We do not allow setReadCallback() to be called before we start
747  // connecting.
748  return invalidState(callback);
749  }
750 
751  // We don't put a default case in the switch statement, so that the compiler
752  // will warn us to update the switch statement if a new state is added.
753  return invalidState(callback);
754 }
755 
757  return errMessageCallback_;
758 }
759 
761  sendMsgParamCallback_ = callback;
762 }
763 
765  return sendMsgParamCallback_;
766 }
767 
769  VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
770  << ", callback=" << callback << ", state=" << state_;
771 
772  // Short circuit if callback is the same as the existing readCallback_.
773  //
774  // Note that this is needed for proper functioning during some cleanup cases.
775  // During cleanup we allow setReadCallback(nullptr) to be called even if the
776  // read callback is already unset and we have been detached from an event
777  // base. This check prevents us from asserting
778  // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
779  if (callback == readCallback_) {
780  return;
781  }
782 
783  /* We are removing a read callback */
784  if (callback == nullptr && immediateReadHandler_.isLoopCallbackScheduled()) {
786  }
787 
788  if (shutdownFlags_ & SHUT_READ) {
789  // Reads have already been shut down on this socket.
790  //
791  // Allow setReadCallback(nullptr) to be called in this case, but don't
792  // allow a new callback to be set.
793  //
794  // For example, setReadCallback(nullptr) can happen after an error if we
795  // invoke some other error callback before invoking readError(). The other
796  // error callback that is invoked first may go ahead and clear the read
797  // callback before we get a chance to invoke readError().
798  if (callback != nullptr) {
799  return invalidState(callback);
800  }
801  assert((eventFlags_ & EventHandler::READ) == 0);
802  readCallback_ = nullptr;
803  return;
804  }
805 
806  DestructorGuard dg(this);
808 
809  switch ((StateEnum)state_) {
812  // For convenience, we allow the read callback to be set while we are
813  // still connecting. We just store the callback for now. Once the
814  // connection completes we'll register for read events.
815  readCallback_ = callback;
816  return;
817  case StateEnum::ESTABLISHED: {
818  readCallback_ = callback;
819  uint16_t oldFlags = eventFlags_;
820  if (readCallback_) {
822  } else {
823  eventFlags_ &= ~EventHandler::READ;
824  }
825 
826  // Update our registration if our flags have changed
827  if (eventFlags_ != oldFlags) {
828  // We intentionally ignore the return value here.
829  // updateEventRegistration() will move us into the error state if it
830  // fails, and we don't need to do anything else here afterwards.
831  (void)updateEventRegistration();
832  }
833 
834  if (readCallback_) {
836  }
837  return;
838  }
839  case StateEnum::CLOSED:
840  case StateEnum::ERROR:
841  // We should never reach here. SHUT_READ should always be set
842  // if we are in STATE_CLOSED or STATE_ERROR.
843  assert(false);
844  return invalidState(callback);
845  case StateEnum::UNINIT:
846  // We do not allow setReadCallback() to be called before we start
847  // connecting.
848  return invalidState(callback);
849  }
850 
851  // We don't put a default case in the switch statement, so that the compiler
852  // will warn us to update the switch statement if a new state is added.
853  return invalidState(callback);
854 }
855 
857  return readCallback_;
858 }
859 
860 bool AsyncSocket::setZeroCopy(bool enable) {
861  if (msgErrQueueSupported) {
862  zeroCopyVal_ = enable;
863 
864  if (fd_ < 0) {
865  return false;
866  }
867 
868  int val = enable ? 1 : 0;
869  int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
870 
871  // if enable == false, set zeroCopyEnabled_ = false regardless
872  // if SO_ZEROCOPY is set or not
873  if (!enable) {
874  zeroCopyEnabled_ = enable;
875  return true;
876  }
877 
878  /* if the setsockopt failed, try to see if the socket inherited the flag
879  * since we cannot set SO_ZEROCOPY on a socket s = accept
880  */
881  if (ret) {
882  val = 0;
883  socklen_t optlen = sizeof(val);
884  ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
885 
886  if (!ret) {
887  enable = val ? true : false;
888  }
889  }
890 
891  if (!ret) {
892  zeroCopyEnabled_ = enable;
893 
894  return true;
895  }
896  }
897 
898  return false;
899 }
900 
902  zeroCopyReenableThreshold_ = threshold;
903 }
904 
907 }
908 
910  if (!zeroCopyEnabled_) {
911  // if the zeroCopyReenableCounter_ is > 0
912  // we try to dec and if we reach 0
913  // we set zeroCopyEnabled_ to true
915  if (0 == --zeroCopyReenableCounter_) {
916  zeroCopyEnabled_ = true;
917  return;
918  }
919  }
921  }
922 }
923 
924 void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
926  folly::IOBuf* ptr = buf.get();
927 
929  auto& p = idZeroCopyBufInfoMap_[ptr];
930  p.count_++;
931  CHECK(p.buf_.get() == nullptr);
932  p.buf_ = std::move(buf);
933 }
934 
938 
939  idZeroCopyBufInfoMap_[ptr].count_++;
940 }
941 
943  auto iter = idZeroCopyBufPtrMap_.find(id);
944  CHECK(iter != idZeroCopyBufPtrMap_.end());
945  auto ptr = iter->second;
946  auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
947  CHECK(iter1 != idZeroCopyBufInfoMap_.end());
948  if (0 == --iter1->second.count_) {
949  idZeroCopyBufInfoMap_.erase(iter1);
950  }
951 
952  idZeroCopyBufPtrMap_.erase(iter);
953 }
954 
955 void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
956  folly::IOBuf* ptr = buf.get();
957  auto& p = idZeroCopyBufInfoMap_[ptr];
958  CHECK(p.buf_.get() == nullptr);
959 
960  p.buf_ = std::move(buf);
961 }
962 
964  return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
965 }
966 
967 bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
968 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
969  if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
970  (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
971  const struct sock_extended_err* serr =
972  reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
973  return (
974  (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
975  }
976 #endif
977  (void)cmsg;
978  return false;
979 }
980 
981 void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
982 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
983  const struct sock_extended_err* serr =
984  reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
985  uint32_t hi = serr->ee_data;
986  uint32_t lo = serr->ee_info;
987  // disable zero copy if the buffer was actually copied
988  if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
989  VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
990  << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
991  << "on " << fd_;
992  zeroCopyEnabled_ = false;
993  }
994 
995  for (uint32_t i = lo; i <= hi; i++) {
997  }
998 #else
999  (void)cmsg;
1000 #endif
1001 }
1002 
1004  WriteCallback* callback,
1005  const void* buf,
1006  size_t bytes,
1007  WriteFlags flags) {
1008  iovec op;
1009  op.iov_base = const_cast<void*>(buf);
1010  op.iov_len = bytes;
1011  writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
1012 }
1013 
1015  WriteCallback* callback,
1016  const iovec* vec,
1017  size_t count,
1018  WriteFlags flags) {
1019  writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
1020 }
1021 
1023  WriteCallback* callback,
1024  unique_ptr<IOBuf>&& buf,
1025  WriteFlags flags) {
1026  adjustZeroCopyFlags(flags);
1027 
1028  constexpr size_t kSmallSizeMax = 64;
1029  size_t count = buf->countChainElements();
1030  if (count <= kSmallSizeMax) {
1031  // suppress "warning: variable length array 'vec' is used [-Wvla]"
1033  FOLLY_GNU_DISABLE_WARNING("-Wvla")
1034  iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)];
1036 
1037  writeChainImpl(callback, vec, count, std::move(buf), flags);
1038  } else {
1039  iovec* vec = new iovec[count];
1040  writeChainImpl(callback, vec, count, std::move(buf), flags);
1041  delete[] vec;
1042  }
1043 }
1044 
1046  WriteCallback* callback,
1047  iovec* vec,
1048  size_t count,
1049  unique_ptr<IOBuf>&& buf,
1050  WriteFlags flags) {
1051  size_t veclen = buf->fillIov(vec, count);
1052  writeImpl(callback, vec, veclen, std::move(buf), flags);
1053 }
1054 
1056  WriteCallback* callback,
1057  const iovec* vec,
1058  size_t count,
1059  unique_ptr<IOBuf>&& buf,
1060  WriteFlags flags) {
1061  VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
1062  << ", callback=" << callback << ", count=" << count
1063  << ", state=" << state_;
1064  DestructorGuard dg(this);
1065  unique_ptr<IOBuf> ioBuf(std::move(buf));
1067 
1069  // No new writes may be performed after the write side of the socket has
1070  // been shutdown.
1071  //
1072  // We could just call callback->writeError() here to fail just this write.
1073  // However, fail hard and use invalidState() to fail all outstanding
1074  // callbacks and move the socket into the error state. There's most likely
1075  // a bug in the caller's code, so we abort everything rather than trying to
1076  // proceed as best we can.
1077  return invalidState(callback);
1078  }
1079 
1080  uint32_t countWritten = 0;
1081  uint32_t partialWritten = 0;
1082  ssize_t bytesWritten = 0;
1083  bool mustRegister = false;
1084  if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
1085  !connecting()) {
1086  if (writeReqHead_ == nullptr) {
1087  // If we are established and there are no other writes pending,
1088  // we can attempt to perform the write immediately.
1089  assert(writeReqTail_ == nullptr);
1090  assert((eventFlags_ & EventHandler::WRITE) == 0);
1091 
1092  auto writeResult = performWrite(
1093  vec, uint32_t(count), flags, &countWritten, &partialWritten);
1094  bytesWritten = writeResult.writeReturn;
1095  if (bytesWritten < 0) {
1096  auto errnoCopy = errno;
1097  if (writeResult.exception) {
1098  return failWrite(__func__, callback, 0, *writeResult.exception);
1099  }
1102  withAddr("writev failed"),
1103  errnoCopy);
1104  return failWrite(__func__, callback, 0, ex);
1105  } else if (countWritten == count) {
1106  // done, add the whole buffer
1107  if (countWritten && isZeroCopyRequest(flags)) {
1108  addZeroCopyBuf(std::move(ioBuf));
1109  }
1110  // We successfully wrote everything.
1111  // Invoke the callback and return.
1112  if (callback) {
1113  callback->writeSuccess();
1114  }
1115  return;
1116  } else { // continue writing the next writeReq
1117  // add just the ptr
1118  if (bytesWritten && isZeroCopyRequest(flags)) {
1119  addZeroCopyBuf(ioBuf.get());
1120  }
1121  if (bufferCallback_) {
1123  }
1124  }
1125  if (!connecting()) {
1126  // Writes might put the socket back into connecting state
1127  // if TFO is enabled, and using TFO fails.
1128  // This means that write timeouts would not be active, however
1129  // connect timeouts would affect this stage.
1130  mustRegister = true;
1131  }
1132  }
1133  } else if (!connecting()) {
1134  // Invalid state for writing
1135  return invalidState(callback);
1136  }
1137 
1138  // Create a new WriteRequest to add to the queue
1139  WriteRequest* req;
1140  try {
1142  this,
1143  callback,
1144  vec + countWritten,
1145  uint32_t(count - countWritten),
1146  partialWritten,
1147  uint32_t(bytesWritten),
1148  std::move(ioBuf),
1149  flags);
1150  } catch (const std::exception& ex) {
1151  // we mainly expect to catch std::bad_alloc here
1154  withAddr(string("failed to append new WriteRequest: ") + ex.what()));
1155  return failWrite(__func__, callback, size_t(bytesWritten), tex);
1156  }
1157  req->consume();
1158  if (writeReqTail_ == nullptr) {
1159  assert(writeReqHead_ == nullptr);
1160  writeReqHead_ = writeReqTail_ = req;
1161  } else {
1162  writeReqTail_->append(req);
1163  writeReqTail_ = req;
1164  }
1165 
1166  // Register for write events if are established and not currently
1167  // waiting on write events
1168  if (mustRegister) {
1169  assert(state_ == StateEnum::ESTABLISHED);
1170  assert((eventFlags_ & EventHandler::WRITE) == 0);
1171  if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1172  assert(state_ == StateEnum::ERROR);
1173  return;
1174  }
1175  if (sendTimeout_ > 0) {
1176  // Schedule a timeout to fire if the write takes too long.
1180  withAddr("failed to schedule send timeout"));
1181  return failWrite(__func__, ex);
1182  }
1183  }
1184  }
1185 }
1186 
1188  if (writeReqTail_ == nullptr) {
1189  assert(writeReqHead_ == nullptr);
1190  writeReqHead_ = writeReqTail_ = req;
1191  req->start();
1192  } else {
1193  writeReqTail_->append(req);
1194  writeReqTail_ = req;
1195  }
1196 }
1197 
1199  VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
1200  << ", state=" << state_ << ", shutdownFlags=" << std::hex
1201  << (int)shutdownFlags_;
1202 
1203  // close() is only different from closeNow() when there are pending writes
1204  // that need to drain before we can close. In all other cases, just call
1205  // closeNow().
1206  //
1207  // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
1208  // STATE_ERROR if close() is invoked while a previous closeNow() or failure
1209  // is still running. (e.g., If there are multiple pending writes, and we
1210  // call writeError() on the first one, it may call close(). In this case we
1211  // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
1212  // writes will still be in the queue.)
1213  //
1214  // We only need to drain pending writes if we are still in STATE_CONNECTING
1215  // or STATE_ESTABLISHED
1216  if ((writeReqHead_ == nullptr) ||
1218  closeNow();
1219  return;
1220  }
1221 
1222  // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
1223  // destroyed until close() returns.
1224  DestructorGuard dg(this);
1226 
1227  // Since there are write requests pending, we have to set the
1228  // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
1229  // connect finishes and we finish writing these requests.
1230  //
1231  // Set SHUT_READ to indicate that reads are shut down, and set the
1232  // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
1233  // pending writes complete.
1235 
1236  // If a read callback is set, invoke readEOF() immediately to inform it that
1237  // the socket has been closed and no more data can be read.
1238  if (readCallback_) {
1239  // Disable reads if they are enabled
1241  // We're now in the error state; callbacks have been cleaned up
1242  assert(state_ == StateEnum::ERROR);
1243  assert(readCallback_ == nullptr);
1244  } else {
1245  ReadCallback* callback = readCallback_;
1246  readCallback_ = nullptr;
1247  callback->readEOF();
1248  }
1249  }
1250 }
1251 
1253  VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
1254  << ", state=" << state_ << ", shutdownFlags=" << std::hex
1255  << (int)shutdownFlags_;
1256  DestructorGuard dg(this);
1257  if (eventBase_) {
1259  }
1260 
1261  switch (state_) {
1263  case StateEnum::CONNECTING:
1264  case StateEnum::FAST_OPEN: {
1267 
1268  // If the write timeout was set, cancel it.
1270 
1271  // If we are registered for I/O events, unregister.
1274  if (!updateEventRegistration()) {
1275  // We will have been moved into the error state.
1276  assert(state_ == StateEnum::ERROR);
1277  return;
1278  }
1279  }
1280 
1283  }
1284 
1285  if (fd_ >= 0) {
1287  doClose();
1288  }
1289 
1291 
1293 
1294  if (readCallback_) {
1295  ReadCallback* callback = readCallback_;
1296  readCallback_ = nullptr;
1297  callback->readEOF();
1298  }
1299  return;
1300  }
1301  case StateEnum::CLOSED:
1302  // Do nothing. It's possible that we are being called recursively
1303  // from inside a callback that we invoked inside another call to close()
1304  // that is still running.
1305  return;
1306  case StateEnum::ERROR:
1307  // Do nothing. The error handling code has performed (or is performing)
1308  // cleanup.
1309  return;
1310  case StateEnum::UNINIT:
1311  assert(eventFlags_ == EventHandler::NONE);
1312  assert(connectCallback_ == nullptr);
1313  assert(readCallback_ == nullptr);
1314  assert(writeReqHead_ == nullptr);
1317  return;
1318  }
1319 
1320  LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
1321  << ") called in unknown state " << state_;
1322 }
1323 
1325  // Enable SO_LINGER, with the linger timeout set to 0.
1326  // This will trigger a TCP reset when we close the socket.
1327  if (fd_ >= 0) {
1328  struct linger optLinger = {1, 0};
1329  if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1330  VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
1331  << "on " << fd_ << ": errno=" << errno;
1332  }
1333  }
1334 
1335  // Then let closeNow() take care of the rest
1336  closeNow();
1337 }
1338 
1340  VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
1341  << ", state=" << state_ << ", shutdownFlags=" << std::hex
1342  << (int)shutdownFlags_;
1343 
1344  // If there are no pending writes, shutdownWrite() is identical to
1345  // shutdownWriteNow().
1346  if (writeReqHead_ == nullptr) {
1347  shutdownWriteNow();
1348  return;
1349  }
1350 
1352 
1353  // There are pending writes. Set SHUT_WRITE_PENDING so that the actual
1354  // shutdown will be performed once all writes complete.
1356 }
1357 
1359  VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this << ", fd=" << fd_
1360  << ", state=" << state_ << ", shutdownFlags=" << std::hex
1361  << (int)shutdownFlags_;
1362 
1363  if (shutdownFlags_ & SHUT_WRITE) {
1364  // Writes are already shutdown; nothing else to do.
1365  return;
1366  }
1367 
1368  // If SHUT_READ is already set, just call closeNow() to completely
1369  // close the socket. This can happen if close() was called with writes
1370  // pending, and then shutdownWriteNow() is called before all pending writes
1371  // complete.
1372  if (shutdownFlags_ & SHUT_READ) {
1373  closeNow();
1374  return;
1375  }
1376 
1377  DestructorGuard dg(this);
1378  if (eventBase_) {
1380  }
1381 
1382  switch (static_cast<StateEnum>(state_)) {
1383  case StateEnum::ESTABLISHED: {
1385 
1386  // If the write timeout was set, cancel it.
1388 
1389  // If we are registered for write events, unregister.
1391  // We will have been moved into the error state.
1392  assert(state_ == StateEnum::ERROR);
1393  return;
1394  }
1395 
1396  // Shutdown writes on the file descriptor
1397  shutdown(fd_, SHUT_WR);
1398 
1399  // Immediately fail all write requests
1401  return;
1402  }
1403  case StateEnum::CONNECTING: {
1404  // Set the SHUT_WRITE_PENDING flag.
1405  // When the connection completes, it will check this flag,
1406  // shutdown the write half of the socket, and then set SHUT_WRITE.
1408 
1409  // Immediately fail all write requests
1411  return;
1412  }
1413  case StateEnum::UNINIT:
1414  // Callers normally shouldn't call shutdownWriteNow() before the socket
1415  // even starts connecting. Nonetheless, go ahead and set
1416  // SHUT_WRITE_PENDING. Once the socket eventually connects it will
1417  // immediately shut down the write side of the socket.
1419  return;
1420  case StateEnum::FAST_OPEN:
1421  // In fast open state we haven't call connected yet, and if we shutdown
1422  // the writes, we will never try to call connect, so shut everything down
1424  // Immediately fail all write requests
1426  return;
1427  case StateEnum::CLOSED:
1428  case StateEnum::ERROR:
1429  // We should never get here. SHUT_WRITE should always be set
1430  // in STATE_CLOSED and STATE_ERROR.
1431  VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1432  << ", fd=" << fd_ << ") in unexpected state " << state_
1433  << " with SHUT_WRITE not set (" << std::hex << (int)shutdownFlags_
1434  << ")";
1435  assert(false);
1436  return;
1437  }
1438 
1439  LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this
1440  << ", fd=" << fd_ << ") called in unknown state " << state_;
1441 }
1442 
1444  if (fd_ == -1) {
1445  return false;
1446  }
1447  struct pollfd fds[1];
1448  fds[0].fd = fd_;
1449  fds[0].events = POLLIN;
1450  fds[0].revents = 0;
1451  int rc = poll(fds, 1, 0);
1452  return rc == 1;
1453 }
1454 
1456  if (fd_ == -1) {
1457  return false;
1458  }
1459  struct pollfd fds[1];
1460  fds[0].fd = fd_;
1461  fds[0].events = POLLOUT;
1462  fds[0].revents = 0;
1463  int rc = poll(fds, 1, 0);
1464  return rc == 1;
1465 }
1466 
1468  return ioHandler_.isPending();
1469 }
1470 
1471 bool AsyncSocket::hangup() const {
1472  if (fd_ == -1) {
1473  // sanity check, no one should ask for hangup if we are not connected.
1474  assert(false);
1475  return false;
1476  }
1477 #ifdef POLLRDHUP // Linux-only
1478  struct pollfd fds[1];
1479  fds[0].fd = fd_;
1480  fds[0].events = POLLRDHUP | POLLHUP;
1481  fds[0].revents = 0;
1482  poll(fds, 1, 0);
1483  return (fds[0].revents & (POLLRDHUP | POLLHUP)) != 0;
1484 #else
1485  return false;
1486 #endif
1487 }
1488 
1489 bool AsyncSocket::good() const {
1490  return (
1493  (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1494 }
1495 
1496 bool AsyncSocket::error() const {
1497  return (state_ == StateEnum::ERROR);
1498 }
1499 
1501  VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1502  << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1503  << ", state=" << state_ << ", events=" << std::hex << eventFlags_
1504  << ")";
1505  assert(eventBase_ == nullptr);
1506  eventBase->dcheckIsInEventBaseThread();
1507 
1508  eventBase_ = eventBase;
1509  ioHandler_.attachEventBase(eventBase);
1510 
1512 
1513  writeTimeout_.attachEventBase(eventBase);
1514  if (evbChangeCb_) {
1515  evbChangeCb_->evbAttached(this);
1516  }
1517 }
1518 
1520  VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1521  << ", old evb=" << eventBase_ << ", state=" << state_
1522  << ", events=" << std::hex << eventFlags_ << ")";
1523  assert(eventBase_ != nullptr);
1525 
1526  eventBase_ = nullptr;
1527 
1529 
1532  if (evbChangeCb_) {
1533  evbChangeCb_->evbDetached(this);
1534  }
1535 }
1536 
1538  DCHECK(eventBase_ != nullptr);
1540 
1541  return !writeTimeout_.isScheduled();
1542 }
1543 
1545  if (fd_ >= 0) {
1546  try {
1548  cachePeerAddress();
1549  } catch (const std::system_error& e) {
1550  if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1551  VLOG(2) << "Error caching addresses: " << e.code().value() << ", "
1552  << e.code().message();
1553  }
1554  }
1555  }
1556 }
1557 
1559  if (!localAddr_.isInitialized()) {
1561  }
1562 }
1563 
1565  if (!addr_.isInitialized()) {
1567  }
1568 }
1569 
1572  return (!idZeroCopyBufPtrMap_.empty());
1573 }
1574 
1577  *address = localAddr_;
1578 }
1579 
1581  cachePeerAddress();
1582  *address = addr_;
1583 }
1584 
1586  return detail::tfo_succeeded(fd_);
1587 }
1588 
1589 int AsyncSocket::setNoDelay(bool noDelay) {
1590  if (fd_ < 0) {
1591  VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket " << this
1592  << "(state=" << state_ << ")";
1593  return EINVAL;
1594  }
1595 
1596  int value = noDelay ? 1 : 0;
1597  if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1598  int errnoCopy = errno;
1599  VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket " << this
1600  << " (fd=" << fd_ << ", state=" << state_
1601  << "): " << errnoStr(errnoCopy);
1602  return errnoCopy;
1603  }
1604 
1605  return 0;
1606 }
1607 
1609 #ifndef TCP_CONGESTION
1610 #define TCP_CONGESTION 13
1611 #endif
1612 
1613  if (fd_ < 0) {
1614  VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1615  << "socket " << this << "(state=" << state_ << ")";
1616  return EINVAL;
1617  }
1618 
1619  if (setsockopt(
1620  fd_,
1621  IPPROTO_TCP,
1623  cname.c_str(),
1624  socklen_t(cname.length() + 1)) != 0) {
1625  int errnoCopy = errno;
1626  VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket " << this
1627  << "(fd=" << fd_ << ", state=" << state_
1628  << "): " << errnoStr(errnoCopy);
1629  return errnoCopy;
1630  }
1631 
1632  return 0;
1633 }
1634 
1635 int AsyncSocket::setQuickAck(bool quickack) {
1636  (void)quickack;
1637  if (fd_ < 0) {
1638  VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket " << this
1639  << "(state=" << state_ << ")";
1640  return EINVAL;
1641  }
1642 
1643 #ifdef TCP_QUICKACK // Linux-only
1644  int value = quickack ? 1 : 0;
1645  if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1646  int errnoCopy = errno;
1647  VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket" << this
1648  << "(fd=" << fd_ << ", state=" << state_
1649  << "): " << errnoStr(errnoCopy);
1650  return errnoCopy;
1651  }
1652 
1653  return 0;
1654 #else
1655  return ENOSYS;
1656 #endif
1657 }
1658 
1659 int AsyncSocket::setSendBufSize(size_t bufsize) {
1660  if (fd_ < 0) {
1661  VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1662  << this << "(state=" << state_ << ")";
1663  return EINVAL;
1664  }
1665 
1666  if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) != 0) {
1667  int errnoCopy = errno;
1668  VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket" << this
1669  << "(fd=" << fd_ << ", state=" << state_
1670  << "): " << errnoStr(errnoCopy);
1671  return errnoCopy;
1672  }
1673 
1674  return 0;
1675 }
1676 
1677 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1678  if (fd_ < 0) {
1679  VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1680  << this << "(state=" << state_ << ")";
1681  return EINVAL;
1682  }
1683 
1684  if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) != 0) {
1685  int errnoCopy = errno;
1686  VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket" << this
1687  << "(fd=" << fd_ << ", state=" << state_
1688  << "): " << errnoStr(errnoCopy);
1689  return errnoCopy;
1690  }
1691 
1692  return 0;
1693 }
1694 
1696  if (fd_ < 0) {
1697  VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket " << this
1698  << "(state=" << state_ << ")";
1699  return EINVAL;
1700  }
1701 
1702  if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) != 0) {
1703  int errnoCopy = errno;
1704  VLOG(2) << "failed to set socket namespace option on AsyncSocket" << this
1705  << "(fd=" << fd_ << ", state=" << state_
1706  << "): " << errnoStr(errnoCopy);
1707  return errnoCopy;
1708  }
1709 
1710  return 0;
1711 }
1712 
1714  VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
1715  << ", events=" << std::hex << events << ", state=" << state_;
1716  DestructorGuard dg(this);
1717  assert(events & EventHandler::READ_WRITE);
1719 
1720  uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1721  EventBase* originalEventBase = eventBase_;
1722  // If we got there it means that either EventHandler::READ or
1723  // EventHandler::WRITE is set. Any of these flags can
1724  // indicate that there are messages available in the socket
1725  // error message queue.
1726  // Return if we handle any error messages - this is to avoid
1727  // unnecessary read/write calls
1728  if (handleErrMessages()) {
1729  return;
1730  }
1731 
1732  // Return now if handleErrMessages() detached us from our EventBase
1733  if (eventBase_ != originalEventBase) {
1734  return;
1735  }
1736 
1737  if (relevantEvents == EventHandler::READ) {
1738  handleRead();
1739  } else if (relevantEvents == EventHandler::WRITE) {
1740  handleWrite();
1741  } else if (relevantEvents == EventHandler::READ_WRITE) {
1742  // If both read and write events are ready, process writes first.
1743  handleWrite();
1744 
1745  // Return now if handleWrite() detached us from our EventBase
1746  if (eventBase_ != originalEventBase) {
1747  return;
1748  }
1749 
1750  // Only call handleRead() if a read callback is still installed.
1751  // (It's possible that the read callback was uninstalled during
1752  // handleWrite().)
1753  if (readCallback_) {
1754  handleRead();
1755  }
1756  } else {
1757  VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1758  << std::hex << events << "(this=" << this << ")";
1759  abort();
1760  }
1761 }
1762 
1764 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1765  VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1766  << ", buflen=" << *buflen;
1767 
1768  if (preReceivedData_ && !preReceivedData_->empty()) {
1769  VLOG(5) << "AsyncSocket::performRead() this=" << this
1770  << ", reading pre-received data";
1771 
1772  io::Cursor cursor(preReceivedData_.get());
1773  auto len = cursor.pullAtMost(*buf, *buflen);
1774 
1775  IOBufQueue queue;
1777  queue.trimStart(len);
1778  preReceivedData_ = queue.move();
1779 
1780  appBytesReceived_ += len;
1781  return ReadResult(len);
1782  }
1783 
1784  ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1785  if (bytes < 0) {
1786  if (errno == EAGAIN || errno == EWOULDBLOCK) {
1787  // No more data to read right now.
1788  return ReadResult(READ_BLOCKING);
1789  } else {
1790  return ReadResult(READ_ERROR);
1791  }
1792  } else {
1793  appBytesReceived_ += bytes;
1794  return ReadResult(bytes);
1795  }
1796 }
1797 
1798 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1799  // no matter what, buffer should be preapared for non-ssl socket
1800  CHECK(readCallback_);
1801  readCallback_->getReadBuffer(buf, buflen);
1802 }
1803 
1805  // This method has non-empty implementation only for platforms
1806  // supporting per-socket error queues.
1807  VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
1808  << ", state=" << state_;
1809  if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty()) {
1810  VLOG(7) << "AsyncSocket::handleErrMessages(): "
1811  << "no callback installed - exiting.";
1812  return 0;
1813  }
1814 
1815 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
1816  uint8_t ctrl[1024];
1817  unsigned char data;
1818  struct msghdr msg;
1819  iovec entry;
1820 
1821  entry.iov_base = &data;
1822  entry.iov_len = sizeof(data);
1823  msg.msg_iov = &entry;
1824  msg.msg_iovlen = 1;
1825  msg.msg_name = nullptr;
1826  msg.msg_namelen = 0;
1827  msg.msg_control = ctrl;
1828  msg.msg_controllen = sizeof(ctrl);
1829  msg.msg_flags = 0;
1830 
1831  int ret;
1832  size_t num = 0;
1833  while (true) {
1834  ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
1835  VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1836 
1837  if (ret < 0) {
1838  if (errno != EAGAIN) {
1839  auto errnoCopy = errno;
1840  LOG(ERROR) << "::recvmsg exited with code " << ret
1841  << ", errno: " << errnoCopy;
1844  withAddr("recvmsg() failed"),
1845  errnoCopy);
1846  failErrMessageRead(__func__, ex);
1847  }
1848 
1849  return num;
1850  }
1851 
1852  for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1853  cmsg != nullptr && cmsg->cmsg_len != 0;
1854  cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1855  ++num;
1856  if (isZeroCopyMsg(*cmsg)) {
1857  processZeroCopyMsg(*cmsg);
1858  } else {
1859  if (errMessageCallback_) {
1861  }
1862  }
1863  }
1864  }
1865 #else
1866  return 0;
1867 #endif // FOLLY_HAVE_MSG_ERRQUEUE
1868 }
1869 
1872  if (idZeroCopyBufPtrMap_.empty()) {
1873  return true;
1874  }
1875 
1877 
1878  return idZeroCopyBufPtrMap_.empty();
1879 }
1880 
1882  VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1883  << ", state=" << state_;
1884  assert(state_ == StateEnum::ESTABLISHED);
1885  assert((shutdownFlags_ & SHUT_READ) == 0);
1886  assert(readCallback_ != nullptr);
1887  assert(eventFlags_ & EventHandler::READ);
1888 
1889  // Loop until:
1890  // - a read attempt would block
1891  // - readCallback_ is uninstalled
1892  // - the number of loop iterations exceeds the optional maximum
1893  // - this AsyncSocket is moved to another EventBase
1894  //
1895  // When we invoke readDataAvailable() it may uninstall the readCallback_,
1896  // which is why need to check for it here.
1897  //
1898  // The last bullet point is slightly subtle. readDataAvailable() may also
1899  // detach this socket from this EventBase. However, before
1900  // readDataAvailable() returns another thread may pick it up, attach it to
1901  // a different EventBase, and install another readCallback_. We need to
1902  // exit immediately after readDataAvailable() returns if the eventBase_ has
1903  // changed. (The caller must perform some sort of locking to transfer the
1904  // AsyncSocket between threads properly. This will be sufficient to ensure
1905  // that this thread sees the updated eventBase_ variable after
1906  // readDataAvailable() returns.)
1907  uint16_t numReads = 0;
1908  EventBase* originalEventBase = eventBase_;
1909  while (readCallback_ && eventBase_ == originalEventBase) {
1910  // Get the buffer to read into.
1911  void* buf = nullptr;
1912  size_t buflen = 0, offset = 0;
1913  try {
1914  prepareReadBuffer(&buf, &buflen);
1915  VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1916  } catch (const AsyncSocketException& ex) {
1917  return failRead(__func__, ex);
1918  } catch (const std::exception& ex) {
1921  string("ReadCallback::getReadBuffer() "
1922  "threw exception: ") +
1923  ex.what());
1924  return failRead(__func__, tex);
1925  } catch (...) {
1928  "ReadCallback::getReadBuffer() threw "
1929  "non-exception type");
1930  return failRead(__func__, ex);
1931  }
1932  if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1935  "ReadCallback::getReadBuffer() returned "
1936  "empty buffer");
1937  return failRead(__func__, ex);
1938  }
1939 
1940  // Perform the read
1941  auto readResult = performRead(&buf, &buflen, &offset);
1942  auto bytesRead = readResult.readReturn;
1943  VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1944  << bytesRead << " bytes";
1945  if (bytesRead > 0) {
1946  if (!isBufferMovable_) {
1947  readCallback_->readDataAvailable(size_t(bytesRead));
1948  } else {
1950  VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1951  << "buf=" << buf << ", " << bytesRead << "/" << buflen
1952  << ", offset=" << offset;
1953  auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1954  readBuf->trimStart(offset);
1955  readBuf->trimEnd(buflen - offset - bytesRead);
1957  }
1958 
1959  // Fall through and continue around the loop if the read
1960  // completely filled the available buffer.
1961  // Note that readCallback_ may have been uninstalled or changed inside
1962  // readDataAvailable().
1963  if (size_t(bytesRead) < buflen) {
1964  return;
1965  }
1966  } else if (bytesRead == READ_BLOCKING) {
1967  // No more data to read right now.
1968  return;
1969  } else if (bytesRead == READ_ERROR) {
1970  readErr_ = READ_ERROR;
1971  if (readResult.exception) {
1972  return failRead(__func__, *readResult.exception);
1973  }
1974  auto errnoCopy = errno;
1977  withAddr("recv() failed"),
1978  errnoCopy);
1979  return failRead(__func__, ex);
1980  } else {
1981  assert(bytesRead == READ_EOF);
1982  readErr_ = READ_EOF;
1983  // EOF
1985  if (!updateEventRegistration(0, EventHandler::READ)) {
1986  // we've already been moved into STATE_ERROR
1987  assert(state_ == StateEnum::ERROR);
1988  assert(readCallback_ == nullptr);
1989  return;
1990  }
1991 
1992  ReadCallback* callback = readCallback_;
1993  readCallback_ = nullptr;
1994  callback->readEOF();
1995  return;
1996  }
1997  if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1998  if (readCallback_ != nullptr) {
1999  // We might still have data in the socket.
2000  // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
2002  }
2003  return;
2004  }
2005  }
2006 }
2007 
2020  VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
2021  << ", state=" << state_;
2022  DestructorGuard dg(this);
2023 
2024  if (state_ == StateEnum::CONNECTING) {
2025  handleConnect();
2026  return;
2027  }
2028 
2029  // Normal write
2030  assert(state_ == StateEnum::ESTABLISHED);
2031  assert((shutdownFlags_ & SHUT_WRITE) == 0);
2032  assert(writeReqHead_ != nullptr);
2033 
2034  // Loop until we run out of write requests,
2035  // or until this socket is moved to another EventBase.
2036  // (See the comment in handleRead() explaining how this can happen.)
2037  EventBase* originalEventBase = eventBase_;
2038  while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
2039  auto writeResult = writeReqHead_->performWrite();
2040  if (writeResult.writeReturn < 0) {
2041  if (writeResult.exception) {
2042  return failWrite(__func__, *writeResult.exception);
2043  }
2044  auto errnoCopy = errno;
2047  withAddr("writev() failed"),
2048  errnoCopy);
2049  return failWrite(__func__, ex);
2050  } else if (writeReqHead_->isComplete()) {
2051  // We finished this request
2052  WriteRequest* req = writeReqHead_;
2053  writeReqHead_ = req->getNext();
2054 
2055  if (writeReqHead_ == nullptr) {
2056  writeReqTail_ = nullptr;
2057  // This is the last write request.
2058  // Unregister for write events and cancel the send timer
2059  // before we invoke the callback. We have to update the state properly
2060  // before calling the callback, since it may want to detach us from
2061  // the EventBase.
2063  if (!updateEventRegistration(0, EventHandler::WRITE)) {
2064  assert(state_ == StateEnum::ERROR);
2065  return;
2066  }
2067  // Stop the send timeout
2069  }
2070  assert(!writeTimeout_.isScheduled());
2071 
2072  // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
2073  // we finish sending the last write request.
2074  //
2075  // We have to do this before invoking writeSuccess(), since
2076  // writeSuccess() may detach us from our EventBase.
2078  assert(connectCallback_ == nullptr);
2080 
2081  if (shutdownFlags_ & SHUT_READ) {
2082  // Reads have already been shutdown. Fully close the socket and
2083  // move to STATE_CLOSED.
2084  //
2085  // Note: This code currently moves us to STATE_CLOSED even if
2086  // close() hasn't ever been called. This can occur if we have
2087  // received EOF from the peer and shutdownWrite() has been called
2088  // locally. Should we bother staying in STATE_ESTABLISHED in this
2089  // case, until close() is actually called? I can't think of a
2090  // reason why we would need to do so. No other operations besides
2091  // calling close() or destroying the socket can be performed at
2092  // this point.
2093  assert(readCallback_ == nullptr);
2094  state_ = StateEnum::CLOSED;
2095  if (fd_ >= 0) {
2097  doClose();
2098  }
2099  } else {
2100  // Reads are still enabled, so we are only doing a half-shutdown
2101  shutdown(fd_, SHUT_WR);
2102  }
2103  }
2104  }
2105 
2106  // Invoke the callback
2107  WriteCallback* callback = req->getCallback();
2108  req->destroy();
2109  if (callback) {
2110  callback->writeSuccess();
2111  }
2112  // We'll continue around the loop, trying to write another request
2113  } else {
2114  // Partial write.
2115  if (bufferCallback_) {
2117  }
2119  // Stop after a partial write; it's highly likely that a subsequent write
2120  // attempt will just return EAGAIN.
2121  //
2122  // Ensure that we are registered for write events.
2123  if ((eventFlags_ & EventHandler::WRITE) == 0) {
2124  if (!updateEventRegistration(EventHandler::WRITE, 0)) {
2125  assert(state_ == StateEnum::ERROR);
2126  return;
2127  }
2128  }
2129 
2130  // Reschedule the send timeout, since we have made some write progress.
2131  if (sendTimeout_ > 0) {
2135  withAddr("failed to reschedule write timeout"));
2136  return failWrite(__func__, ex);
2137  }
2138  }
2139  return;
2140  }
2141  }
2142  if (!writeReqHead_ && bufferCallback_) {
2144  }
2145 }
2146 
2148  // We currently don't attempt to perform optimistic reads in AsyncSocket.
2149  // (However, note that some subclasses do override this method.)
2150  //
2151  // Simply calling handleRead() here would be bad, as this would call
2152  // readCallback_->getReadBuffer(), forcing the callback to allocate a read
2153  // buffer even though no data may be available. This would waste lots of
2154  // memory, since the buffer will sit around unused until the socket actually
2155  // becomes readable.
2156  //
2157  // Checking if the socket is readable now also seems like it would probably
2158  // be a pessimism. In most cases it probably wouldn't be readable, and we
2159  // would just waste an extra system call. Even if it is readable, waiting to
2160  // find out from libevent on the next event loop doesn't seem that bad.
2161  //
2162  // The exception to this is if we have pre-received data. In that case there
2163  // is definitely data available immediately.
2164  if (preReceivedData_ && !preReceivedData_->empty()) {
2165  handleRead();
2166  }
2167 }
2168 
2170  // Our callers should already be holding a DestructorGuard, but grab
2171  // one here just to make sure, in case one of our calling code paths ever
2172  // changes.
2173  DestructorGuard dg(this);
2174  // If we have a readCallback_, make sure we enable read events. We
2175  // may already be registered for reads if connectSuccess() set
2176  // the read calback.
2178  assert(state_ == StateEnum::ESTABLISHED);
2179  assert((shutdownFlags_ & SHUT_READ) == 0);
2180  if (!updateEventRegistration(EventHandler::READ, 0)) {
2181  assert(state_ == StateEnum::ERROR);
2182  return;
2183  }
2185  } else if (readCallback_ == nullptr) {
2186  // Unregister for read events.
2187  updateEventRegistration(0, EventHandler::READ);
2188  }
2189 
2190  // If we have write requests pending, try to send them immediately.
2191  // Since we just finished accepting, there is a very good chance that we can
2192  // write without blocking.
2193  //
2194  // However, we only process them if EventHandler::WRITE is not already set,
2195  // which means that we're already blocked on a write attempt. (This can
2196  // happen if connectSuccess() called write() before returning.)
2198  // Call handleWrite() to perform write processing.
2199  handleWrite();
2200  } else if (writeReqHead_ == nullptr) {
2201  // Unregister for write event.
2202  updateEventRegistration(0, EventHandler::WRITE);
2203  }
2204 }
2205 
2207  VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
2208  << ", state=" << state_;
2209  assert(state_ == StateEnum::CONNECTING);
2210  // SHUT_WRITE can never be set while we are still connecting;
2211  // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
2212  // finishes
2213  assert((shutdownFlags_ & SHUT_WRITE) == 0);
2214 
2215  // In case we had a connect timeout, cancel the timeout
2217  // We don't use a persistent registration when waiting on a connect event,
2218  // so we have been automatically unregistered now. Update eventFlags_ to
2219  // reflect reality.
2220  assert(eventFlags_ == EventHandler::WRITE);
2222 
2223  // Call getsockopt() to check if the connect succeeded
2224  int error;
2225  socklen_t len = sizeof(error);
2226  int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
2227  if (rv != 0) {
2228  auto errnoCopy = errno;
2231  withAddr("error calling getsockopt() after connect"),
2232  errnoCopy);
2233  VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd=" << fd_
2234  << " host=" << addr_.describe() << ") exception:" << ex.what();
2235  return failConnect(__func__, ex);
2236  }
2237 
2238  if (error != 0) {
2240  AsyncSocketException::NOT_OPEN, "connect failed", error);
2241  VLOG(2) << "AsyncSocket::handleConnect(this=" << this << ", fd=" << fd_
2242  << " host=" << addr_.describe() << ") exception: " << ex.what();
2243  return failConnect(__func__, ex);
2244  }
2245 
2246  // Move into STATE_ESTABLISHED
2247  state_ = StateEnum::ESTABLISHED;
2248 
2249  // If SHUT_WRITE_PENDING is set and we don't have any write requests to
2250  // perform, immediately shutdown the write half of the socket.
2251  if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
2252  // SHUT_READ shouldn't be set. If close() is called on the socket while we
2253  // are still connecting we just abort the connect rather than waiting for
2254  // it to complete.
2255  assert((shutdownFlags_ & SHUT_READ) == 0);
2256  shutdown(fd_, SHUT_WR);
2258  }
2259 
2260  VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
2261  << "successfully connected; state=" << state_;
2262 
2263  // Remember the EventBase we are attached to, before we start invoking any
2264  // callbacks (since the callbacks may call detachEventBase()).
2265  EventBase* originalEventBase = eventBase_;
2266 
2268  // Note that the connect callback may have changed our state.
2269  // (set or unset the read callback, called write(), closed the socket, etc.)
2270  // The following code needs to handle these situations correctly.
2271  //
2272  // If the socket has been closed, readCallback_ and writeReqHead_ will
2273  // always be nullptr, so that will prevent us from trying to read or write.
2274  //
2275  // The main thing to check for is if eventBase_ is still originalEventBase.
2276  // If not, we have been detached from this event base, so we shouldn't
2277  // perform any more operations.
2278  if (eventBase_ != originalEventBase) {
2279  return;
2280  }
2281 
2283 }
2284 
2286  VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
2287  << "state=" << state_ << ", events=" << std::hex << eventFlags_;
2288  DestructorGuard dg(this);
2290 
2291  if (state_ == StateEnum::CONNECTING) {
2292  // connect() timed out
2293  // Unregister for I/O events.
2294  if (connectCallback_) {
2298  "connect timed out after {}ms", connectTimeout_.count()));
2299  failConnect(__func__, ex);
2300  } else {
2301  // we faced a connect error without a connect callback, which could
2302  // happen due to TFO.
2304  AsyncSocketException::TIMED_OUT, "write timed out during connection");
2305  failWrite(__func__, ex);
2306  }
2307  } else {
2308  // a normal write operation timed out
2311  folly::sformat("write timed out after {}ms", sendTimeout_));
2312  failWrite(__func__, ex);
2313  }
2314 }
2315 
2316 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
2317  return detail::tfo_sendmsg(fd, msg, msg_flags);
2318 }
2319 
2321 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
2322  ssize_t totalWritten = 0;
2323  if (state_ == StateEnum::FAST_OPEN) {
2324  sockaddr_storage addr;
2325  auto len = addr_.getAddress(&addr);
2326  msg->msg_name = &addr;
2327  msg->msg_namelen = len;
2328  totalWritten = tfoSendMsg(fd_, msg, msg_flags);
2329  if (totalWritten >= 0) {
2330  tfoFinished_ = true;
2332  // We schedule this asynchrously so that we don't end up
2333  // invoking initial read or write while a write is in progress.
2335  } else if (errno == EINPROGRESS) {
2336  VLOG(4) << "TFO falling back to connecting";
2337  // A normal sendmsg doesn't return EINPROGRESS, however
2338  // TFO might fallback to connecting if there is no
2339  // cookie.
2341  try {
2344  } catch (const AsyncSocketException& ex) {
2345  return WriteResult(
2346  WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2347  }
2348  // Let's fake it that no bytes were written and return an errno.
2349  errno = EAGAIN;
2350  totalWritten = -1;
2351  } else if (errno == EOPNOTSUPP) {
2352  // Try falling back to connecting.
2353  VLOG(4) << "TFO not supported";
2355  try {
2356  int ret = socketConnect((const sockaddr*)&addr, len);
2357  if (ret == 0) {
2358  // connect succeeded immediately
2359  // Treat this like no data was written.
2362  }
2363  // If there was no exception during connections,
2364  // we would return that no bytes were written.
2365  errno = EAGAIN;
2366  totalWritten = -1;
2367  } catch (const AsyncSocketException& ex) {
2368  return WriteResult(
2369  WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2370  }
2371  } else if (errno == EAGAIN) {
2372  // Normally sendmsg would indicate that the write would block.
2373  // However in the fast open case, it would indicate that sendmsg
2374  // fell back to a connect. This is a return code from connect()
2375  // instead, and is an error condition indicating no fds available.
2376  return WriteResult(
2377  WRITE_ERROR,
2378  std::make_unique<AsyncSocketException>(
2379  AsyncSocketException::UNKNOWN, "No more free local ports"));
2380  }
2381  } else {
2382  totalWritten = ::sendmsg(fd, msg, msg_flags);
2383  }
2384  return WriteResult(totalWritten);
2385 }
2386 
2388  const iovec* vec,
2389  uint32_t count,
2390  WriteFlags flags,
2391  uint32_t* countWritten,
2392  uint32_t* partialWritten) {
2393  // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
2394  // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
2395  // (since it may terminate the program if the main program doesn't explicitly
2396  // ignore it).
2397  struct msghdr msg;
2398  msg.msg_name = nullptr;
2399  msg.msg_namelen = 0;
2400  msg.msg_iov = const_cast<iovec*>(vec);
2401  msg.msg_iovlen = std::min<size_t>(count, kIovMax);
2402  msg.msg_flags = 0;
2403  msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
2404  CHECK_GE(
2406  msg.msg_controllen);
2407 
2408  if (msg.msg_controllen != 0) {
2409  msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
2410  sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
2411  } else {
2412  msg.msg_control = nullptr;
2413  }
2414  int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
2415 
2416  auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2417  auto totalWritten = writeResult.writeReturn;
2418  if (totalWritten < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
2419  // workaround for running with zerocopy enabled but without a big enough
2420  // memlock value - see ulimit -l
2421  zeroCopyEnabled_ = false;
2423  msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
2424  writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2425  totalWritten = writeResult.writeReturn;
2426  }
2427  if (totalWritten < 0) {
2428  bool tryAgain = (errno == EAGAIN);
2429 #ifdef __APPLE__
2430  // Apple has a bug where doing a second write on a socket which we
2431  // have opened with TFO causes an ENOTCONN to be thrown. However the
2432  // socket is really connected, so treat ENOTCONN as a EAGAIN until
2433  // this bug is fixed.
2434  tryAgain |= (errno == ENOTCONN);
2435 #endif
2436 
2437  if (!writeResult.exception && tryAgain) {
2438  // TCP buffer is full; we can't write any more data right now.
2439  *countWritten = 0;
2440  *partialWritten = 0;
2441  return WriteResult(0);
2442  }
2443  // error
2444  *countWritten = 0;
2445  *partialWritten = 0;
2446  return writeResult;
2447  }
2448 
2449  appBytesWritten_ += totalWritten;
2450 
2451  uint32_t bytesWritten;
2452  uint32_t n;
2453  for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
2454  const iovec* v = vec + n;
2455  if (v->iov_len > bytesWritten) {
2456  // Partial write finished in the middle of this iovec
2457  *countWritten = n;
2458  *partialWritten = bytesWritten;
2459  return WriteResult(totalWritten);
2460  }
2461 
2462  bytesWritten -= uint32_t(v->iov_len);
2463  }
2464 
2465  assert(bytesWritten == 0);
2466  *countWritten = n;
2467  *partialWritten = 0;
2468  return WriteResult(totalWritten);
2469 }
2470 
2481  VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
2482  << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
2483  << ", events=" << std::hex << eventFlags_;
2485  if (eventFlags_ == EventHandler::NONE) {
2487  return true;
2488  }
2489 
2490  // Always register for persistent events, so we don't have to re-register
2491  // after being called back.
2493  uint16_t(eventFlags_ | EventHandler::PERSIST))) {
2494  eventFlags_ = EventHandler::NONE; // we're not registered after error
2497  withAddr("failed to update AsyncSocket event registration"));
2498  fail("updateEventRegistration", ex);
2499  return false;
2500  }
2501 
2502  return true;
2503 }
2504 
2506  uint16_t oldFlags = eventFlags_;
2507  eventFlags_ |= enable;
2508  eventFlags_ &= ~disable;
2509  if (eventFlags_ == oldFlags) {
2510  return true;
2511  } else {
2512  return updateEventRegistration();
2513  }
2514 }
2515 
2517  // startFail() should only be called once
2518  assert(state_ != StateEnum::ERROR);
2519  assert(getDestructorGuardCount() > 0);
2521  // Ensure that SHUT_READ and SHUT_WRITE are set,
2522  // so all future attempts to read or write will be rejected
2524 
2525  // Cancel any scheduled immediate read.
2528  }
2529 
2533  }
2535 
2536  if (fd_ >= 0) {
2538  doClose();
2539  }
2540 }
2541 
2543  invokeConnectErr(ex);
2544  failAllWrites(ex);
2545 
2546  if (readCallback_) {
2547  ReadCallback* callback = readCallback_;
2548  readCallback_ = nullptr;
2549  callback->readErr(ex);
2550  }
2551 }
2552 
2554  assert(state_ == StateEnum::ERROR);
2555  assert(getDestructorGuardCount() > 0);
2556 
2559  withAddr("socket closing after error"));
2560  invokeAllErrors(ex);
2561 }
2562 
2564  assert(state_ == StateEnum::ERROR);
2565  assert(getDestructorGuardCount() > 0);
2566  invokeAllErrors(ex);
2567 }
2568 
2569 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2570  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2571  << ", state=" << state_ << " host=" << addr_.describe()
2572  << "): failed in " << fn << "(): " << ex.what();
2573  startFail();
2574  finishFail();
2575 }
2576 
2577 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2578  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2579  << ", state=" << state_ << " host=" << addr_.describe()
2580  << "): failed while connecting in " << fn << "(): " << ex.what();
2581  startFail();
2582 
2583  invokeConnectErr(ex);
2584  finishFail(ex);
2585 }
2586 
2587 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2588  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2589  << ", state=" << state_ << " host=" << addr_.describe()
2590  << "): failed while reading in " << fn << "(): " << ex.what();
2591  startFail();
2592 
2593  if (readCallback_ != nullptr) {
2594  ReadCallback* callback = readCallback_;
2595  readCallback_ = nullptr;
2596  callback->readErr(ex);
2597  }
2598 
2599  finishFail();
2600 }
2601 
2603  const char* fn,
2604  const AsyncSocketException& ex) {
2605  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2606  << ", state=" << state_ << " host=" << addr_.describe()
2607  << "): failed while reading message in " << fn << "(): " << ex.what();
2608  startFail();
2609 
2610  if (errMessageCallback_ != nullptr) {
2612  errMessageCallback_ = nullptr;
2613  callback->errMessageError(ex);
2614  }
2615 
2616  finishFail();
2617 }
2618 
2619 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2620  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2621  << ", state=" << state_ << " host=" << addr_.describe()
2622  << "): failed while writing in " << fn << "(): " << ex.what();
2623  startFail();
2624 
2625  // Only invoke the first write callback, since the error occurred while
2626  // writing this request. Let any other pending write callbacks be invoked in
2627  // finishFail().
2628  if (writeReqHead_ != nullptr) {
2629  WriteRequest* req = writeReqHead_;
2630  writeReqHead_ = req->getNext();
2631  WriteCallback* callback = req->getCallback();
2632  uint32_t bytesWritten = req->getTotalBytesWritten();
2633  req->destroy();
2634  if (callback) {
2635  callback->writeErr(bytesWritten, ex);
2636  }
2637  }
2638 
2639  finishFail();
2640 }
2641 
2643  const char* fn,
2644  WriteCallback* callback,
2645  size_t bytesWritten,
2646  const AsyncSocketException& ex) {
2647  // This version of failWrite() is used when the failure occurs before
2648  // we've added the callback to writeReqHead_.
2649  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2650  << ", state=" << state_ << " host=" << addr_.describe()
2651  << "): failed while writing in " << fn << "(): " << ex.what();
2652  startFail();
2653 
2654  if (callback != nullptr) {
2655  callback->writeErr(bytesWritten, ex);
2656  }
2657 
2658  finishFail();
2659 }
2660 
2662  // Invoke writeError() on all write callbacks.
2663  // This is used when writes are forcibly shutdown with write requests
2664  // pending, or when an error occurs with writes pending.
2665  while (writeReqHead_ != nullptr) {
2666  WriteRequest* req = writeReqHead_;
2667  writeReqHead_ = req->getNext();
2668  WriteCallback* callback = req->getCallback();
2669  if (callback) {
2670  callback->writeErr(req->getTotalBytesWritten(), ex);
2671  }
2672  req->destroy();
2673  }
2674 }
2675 
2677  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2678  << "): connect() called in invalid state " << state_;
2679 
2680  /*
2681  * The invalidState() methods don't use the normal failure mechanisms,
2682  * since we don't know what state we are in. We don't want to call
2683  * startFail()/finishFail() recursively if we are already in the middle of
2684  * cleaning up.
2685  */
2686 
2689  "connect() called with socket in invalid state");
2691  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2692  if (callback) {
2693  callback->connectErr(ex);
2694  }
2695  } else {
2696  // We can't use failConnect() here since connectCallback_
2697  // may already be set to another callback. Invoke this ConnectCallback
2698  // here; any other connectCallback_ will be invoked in finishFail()
2699  startFail();
2700  if (callback) {
2701  callback->connectErr(ex);
2702  }
2703  finishFail();
2704  }
2705 }
2706 
2708  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2709  << "): setErrMessageCB(" << callback << ") called in invalid state "
2710  << state_;
2711 
2714  msgErrQueueSupported
2715  ? "setErrMessageCB() called with socket in invalid state"
2716  : "This platform does not support socket error message notifications");
2717  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2718  if (callback) {
2719  callback->errMessageError(ex);
2720  }
2721  } else {
2722  startFail();
2723  if (callback) {
2724  callback->errMessageError(ex);
2725  }
2726  finishFail();
2727  }
2728 }
2729 
2732  if (connectCallback_) {
2733  ConnectCallback* callback = connectCallback_;
2734  connectCallback_ = nullptr;
2735  callback->connectErr(ex);
2736  }
2737 }
2738 
2741  if (connectCallback_) {
2742  ConnectCallback* callback = connectCallback_;
2743  connectCallback_ = nullptr;
2744  callback->connectSuccess();
2745  }
2746 }
2747 
2749  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2750  << "): setReadCallback(" << callback << ") called in invalid state "
2751  << state_;
2752 
2755  "setReadCallback() called with socket in "
2756  "invalid state");
2757  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2758  if (callback) {
2759  callback->readErr(ex);
2760  }
2761  } else {
2762  startFail();
2763  if (callback) {
2764  callback->readErr(ex);
2765  }
2766  finishFail();
2767  }
2768 }
2769 
2771  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2772  << "): write() called in invalid state " << state_;
2773 
2776  withAddr("write() called with socket in invalid state"));
2777  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2778  if (callback) {
2779  callback->writeErr(0, ex);
2780  }
2781  } else {
2782  startFail();
2783  if (callback) {
2784  callback->writeErr(0, ex);
2785  }
2786  finishFail();
2787  }
2788 }
2789 
2791  if (fd_ == -1) {
2792  return;
2793  }
2794  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
2796  } else {
2797  ::close(fd_);
2798  }
2799  fd_ = -1;
2800 
2801  // we also want to clear the zerocopy maps
2802  // if the fd has been closed
2803  idZeroCopyBufPtrMap_.clear();
2804  idZeroCopyBufInfoMap_.clear();
2805 }
2806 
2807 std::ostream& operator<<(
2808  std::ostream& os,
2809  const AsyncSocket::StateEnum& state) {
2810  os << static_cast<int>(state);
2811  return os;
2812 }
2813 
2815  // Don't use addr_ directly because it may not be initialized
2816  // e.g. if constructed from fd
2818  try {
2819  getPeerAddress(&peer);
2820  getLocalAddress(&local);
2821  } catch (const std::exception&) {
2822  // ignore
2823  } catch (...) {
2824  // ignore
2825  }
2826  return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2827 }
2828 
2830  bufferCallback_ = cb;
2831 }
2832 
2833 } // namespace folly
virtual void readBufferAvailable(std::unique_ptr< IOBuf >) noexcept
void setZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void scheduleImmediateRead() noexcept
Definition: AsyncSocket.h:1031
void setShutdownSocketSet(const std::weak_ptr< ShutdownSocketSet > &wSS)
bool getTFOSucceded() const
void * ptr
virtual ssize_t tfoSendMsg(int fd, struct msghdr *msg, int msg_flags)
static BytesWriteRequest * newRequest(AsyncSocket *socket, WriteCallback *callback, const iovec *ops, uint32_t opCount, uint32_t partialWritten, uint32_t bytesWritten, unique_ptr< IOBuf > &&ioBuf, WriteFlags flags)
Definition: AsyncSocket.cpp:83
const struct iovec * getOps() const
friend std::ostream & operator<<(std::ostream &os, const StateEnum &state)
std::unordered_map< uint32_t, folly::IOBuf * > idZeroCopyBufPtrMap_
Definition: AsyncSocket.h:1223
#define SO_EE_CODE_ZEROCOPY_COPIED
Definition: NetOps.h:47
#define FOLLY_GNU_DISABLE_WARNING(warningName)
Definition: Portability.h:180
virtual void readDataAvailable(size_t len) noexcept=0
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
#define FOLLY_POP_WARNING
Definition: Portability.h:179
flags
Definition: http_parser.h:127
virtual int detachFd()
uint32_t opCount_
number of entries in writeOps_
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
Definition: Types-inl.h:220
#define SO_ZEROCOPY
Definition: NetOps.h:51
bool connecting() const override
Definition: AsyncSocket.h:567
void shutdownWriteNow() override
#define MSG_ZEROCOPY
Definition: NetOps.h:55
void setFromPeerAddress(int socket)
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
ssize_t sendmsg(NetworkSocket socket, const msghdr *message, int flags)
Definition: NetOps.cpp:328
virtual bool hangup() const
virtual void connectSuccess() noexcept=0
std::string withAddr(const std::string &s)
int setSockOpt(int level, int optname, const T *optval)
Definition: AsyncSocket.h:727
#define FOLLY_PUSH_WARNING
Definition: Portability.h:178
int setTCPProfile(int profd)
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
bool isZeroCopyRequest(WriteFlags flags)
bool containsZeroCopyBuf(folly::IOBuf *ptr)
~AsyncSocket() override
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
bool writable() const override
WriteFlags flags_
set for WriteFlags
bool processZeroCopyWriteInProgress() noexcept
virtual WriteResult performWrite(const iovec *vec, uint32_t count, WriteFlags flags, uint32_t *countWritten, uint32_t *partialWritten)
constexpr size_t kIovMax
Definition: SysUio.h:39
void writev(WriteCallback *callback, const iovec *vec, size_t count, WriteFlags flags=WriteFlags::NONE) override
void cacheLocalAddress() const
int setSendBufSize(size_t bufsize)
void invokeAllErrors(const AsyncSocketException &ex)
ssize_t bytesWritten_
bytes written altogether
bool isZeroCopyWriteInProgress() const noexcept
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
virtual void errMessageError(const AsyncSocketException &ex) noexcept=0
uint32_t sendTimeout_
The send timeout, in milliseconds.
Definition: AsyncSocket.h:1233
virtual void connectErr(const AsyncSocketException &ex) noexcept=0
virtual void handleConnect() noexcept
socklen_t getActualSize() const
STL namespace.
std::unique_ptr< EvbChangeCallback > evbChangeCb_
Definition: AsyncSocket.h:1265
void cachePeerAddress() const
double val
Definition: String.cpp:273
bool isPending() const
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
shutdownWrite() called, but we are still waiting on writes to drain
Definition: AsyncSocket.h:964
virtual void setErrMessageCB(ErrMessageCallback *callback)
constexpr bool kOpenSslModeMoveBufferOwnership
bool isZeroCopyMsg(const cmsghdr &cmsg) const
size_t pullAtMost(void *buf, size_t len)
Definition: Cursor.h:407
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
EventBase * eventBase_
The EventBase.
Definition: AsyncSocket.h:1240
folly::SocketAddress addr_
The address we tried to connect to.
Definition: AsyncSocket.h:1230
std::string describe() const
int8_t readErr_
The read error encountered, if any.
Definition: AsyncSocket.h:1238
void attachEventBase(EventBase *eventBase)
void failErrMessageRead(const char *fn, const AsyncSocketException &ex)
requires E e noexcept(noexcept(s.error(std::move(e))))
WriteRequest * getNext() const
Definition: AsyncSocket.h:903
virtual void prepareReadBuffer(void **buf, size_t *buflen)
std::chrono::steady_clock::time_point connectStartTime_
Definition: AsyncSocket.h:1260
void closeNow() override
AsyncSocket * socket_
parent socket
Definition: AsyncSocket.h:933
virtual void setSendMsgParamCB(SendMsgParamsCallback *callback)
bool updateEventRegistration()
void scheduleInitialReadWrite() noexcept
Definition: AsyncSocket.h:1040
#define nullptr
Definition: http_parser.c:41
WriteRequest * writeReqTail_
End of WriteRequest chain.
Definition: AsyncSocket.h:1251
WriteRequest * writeReqHead_
Chain of WriteRequests.
Definition: AsyncSocket.h:1250
bool error() const override
void failAllWrites(const AsyncSocketException &ex)
#define SO_SET_NAMESPACE
Definition: AsyncSocket.h:698
uint8_t shutdownFlags_
Shutdown state (ShutdownFlags)
Definition: AsyncSocket.h:1227
void writeChainImpl(WriteCallback *callback, iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags)
SendMsgParamsCallback * sendMsgParamCallback_
< Callback for retrieving
Definition: AsyncSocket.h:1248
int setRecvBufSize(size_t bufsize)
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds connectTimeout_
Definition: AsyncSocket.h:1263
void fail(const char *fn, const AsyncSocketException &ex)
std::weak_ptr< ShutdownSocketSet > wShutdownSocketSet_
Definition: AsyncSocket.h:1252
bool isSet(WriteFlags a, WriteFlags b)
virtual void connect(ConnectCallback *callback, const folly::SocketAddress &address, int timeout=0, const OptionMap &options=emptyOptionMap, const folly::SocketAddress &bindAddr=anyAddress()) noexcept
void attachEventBase(EventBase *eventBase) override
size_t zeroCopyReenableCounter_
Definition: AsyncSocket.h:1279
void writeChain(WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
BytesWriteRequest(AsyncSocket *socket, WriteCallback *callback, const struct iovec *ops, uint32_t opCount, uint32_t partialBytes, uint32_t bytesWritten, unique_ptr< IOBuf > &&ioBuf, WriteFlags flags)
virtual void handleRead() noexcept
void registerForConnectEvents()
const char * name
Definition: http_parser.c:437
virtual void writeRequest(WriteRequest *req)
bool isLoopCallbackScheduled() const
Definition: EventBase.h:160
struct iovec writeOps_[]
write operation(s) list
#define FOLLY_HAVE_VLA_01
Definition: AsyncSocket.cpp:40
bool setZeroCopy(bool enable)
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
const AsyncSocketException socketShutdownForWritesEx(AsyncSocketException::END_OF_FILE,"socket shutdown for writes")
ImmediateReadCB immediateReadHandler_
LoopCallback for checking read.
Definition: AsyncSocket.h:1243
folly::SocketAddress localAddr_
The address we are connecting from.
Definition: AsyncSocket.h:1231
void invalidState(ConnectCallback *callback)
#define TCP_CONGESTION
void append(WriteRequest *next)
Definition: AsyncSocket.h:915
void destroy() override
void processZeroCopyMsg(const cmsghdr &cmsg)
void ioReady(uint16_t events) noexcept
size_t appBytesReceived_
Num of bytes received from socket.
Definition: AsyncSocket.h:1253
sa_family_t getFamily() const
SocketAddress getPeerAddress() const
static const folly::SocketAddress & anyAddress()
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
IoHandler ioHandler_
A EventHandler to monitor the fd.
Definition: AsyncSocket.h:1242
void write(WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
virtual void getAncillaryData(folly::WriteFlags, void *) noexcept
Definition: AsyncSocket.h:170
void detachEventBase() override
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
uint16_t eventFlags_
EventBase::HandlerFlags settings.
Definition: AsyncSocket.h:1228
uint32_t opsWritten_
complete ops written
bool isInitialized() const
void failRead(const char *fn, const AsyncSocketException &ex)
bool good() const override
int getsockopt(NetworkSocket s, int level, int optname, void *optval, socklen_t *optlen)
Definition: NetOps.cpp:112
size_t zeroCopyReenableThreshold_
Definition: AsyncSocket.h:1278
void timeoutExpired() noexcept
void closeWithReset() override
virtual ErrMessageCallback * getErrMessageCallback() const
static const OptionMap emptyOptionMap
Definition: AsyncSocket.h:378
void shutdown(Counter &)
ErrMessageCallback * errMessageCallback_
TimestampCallback.
Definition: AsyncSocket.h:1246
const int ops
ShutdownSocketSet shutdownSocketSet
WriteTimeout writeTimeout_
A timeout for connect and write.
Definition: AsyncSocket.h:1241
Definition: Traits.h:588
int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
Definition: AsyncSocket.h:156
void failConnect(const char *fn, const AsyncSocketException &ex)
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
socklen_t getAddress(sockaddr_storage *addr) const
virtual ReadResult performRead(void **buf, size_t *buflen, size_t *offset)
uint32_t zeroCopyBufId_
Definition: AsyncSocket.h:1216
void free()
unique_ptr< IOBuf > ioBuf_
underlying IOBuf, or nullptr if N/A
static constexpr bool msgErrQueueSupported
uint32_t getNextZeroCopyBufId()
Definition: AsyncSocket.h:1202
virtual void handleWrite() noexcept
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
virtual WriteResult performWrite()=0
uint32_t opIndex_
current index into writeOps_
uint16_t maxReadsPerEvent_
Max reads per event loop iteration.
Definition: AsyncSocket.h:1234
uint32_t totalBytesWritten_
total bytes written
Definition: AsyncSocket.h:936
void attachEventBase(EventBase *eventBase, InternalEnum internal=InternalEnum::NORMAL)
std::chrono::steady_clock::time_point connectEndTime_
Definition: AsyncSocket.h:1261
#define SO_EE_ORIGIN_ZEROCOPY
Definition: NetOps.h:43
void changeHandlerFD(int fd)
Definition: EventHandler.h:143
virtual size_t handleErrMessages() noexcept
fbstring errnoStr(int err)
Definition: String.cpp:463
virtual void scheduleConnectTimeout()
SocketAddress getLocalAddress() const
int * count
ssize_t recvmsg(NetworkSocket s, msghdr *message, int flags)
Definition: NetOps.cpp:268
std::unordered_map< folly::IOBuf *, IOBufInfo > idZeroCopyBufInfoMap_
Definition: AsyncSocket.h:1224
void bytesWritten(size_t count)
Definition: AsyncSocket.h:924
ReadCallback * readCallback_
ReadCallback.
Definition: AsyncSocket.h:1249
void setReadCB(ReadCallback *callback) override
static std::unique_ptr< IOBuf > takeOwnership(void *buf, std::size_t capacity, FreeFunction freeFn=nullptr, void *userData=nullptr, bool freeOnError=true)
Definition: IOBuf.h:304
void setFromLocalAddress(int socket)
uint32_t getTotalBytesWritten() const
Definition: AsyncSocket.h:911
const char * string
Definition: Conv.cpp:212
WriteFlags unSet(WriteFlags a, WriteFlags b)
StateEnum state_
StateEnum describing current state.
Definition: AsyncSocket.h:1226
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
BufferCallback * bufferCallback_
Definition: AsyncSocket.h:1267
void setZeroCopyReenableThreshold(size_t threshold)
void close() override
static set< string > s
const
Definition: upload.py:398
virtual void invokeConnectSuccess()
EventBase * getEventBase() const override
Definition: AsyncSocket.h:328
void addZeroCopyBuf(std::unique_ptr< folly::IOBuf > &&buf)
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
int fd_
The socket file descriptor.
Definition: AsyncSocket.h:1229
bool scheduleTimeout(uint32_t milliseconds)
bool readable() const override
void setBufferCallback(BufferCallback *cb)
vector< string > vec
Definition: StringTest.cpp:35
virtual void writeSuccess() noexcept=0
virtual uint32_t getAncillaryDataSize(folly::WriteFlags) noexcept
Definition: AsyncSocket.h:180
void writeImpl(WriteCallback *callback, const iovec *vec, size_t count, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE)
bool isPending() const override
bool isScheduled() const
virtual SendMsgParamsCallback * getSendMsgParamsCB() const
bool getZeroCopy() const
Definition: AsyncSocket.h:504
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
void failWrite(const char *fn, WriteCallback *callback, size_t bytesWritten, const AsyncSocketException &ex)
writes have been completely shut down
Definition: AsyncSocket.h:966
virtual void checkForImmediateRead() noexcept
AsyncSocket::WriteResult sendSocketMessage(int fd, struct msghdr *msg, int msg_flags)
ssize_t recv(NetworkSocket s, void *buf, size_t len, int flags)
Definition: NetOps.cpp:180
ReadCallback * getReadCallback() const override
const AsyncSocketException socketClosedLocallyEx(AsyncSocketException::END_OF_FILE,"socket closed locally")
bool isDetachable() const override
ssize_t tfo_sendmsg(int, const struct msghdr *, int)
virtual void handleInitialReadWrite() noexcept
int socketConnect(const struct sockaddr *addr, socklen_t len)
WriteResult performWrite() override
int setCongestionFlavor(const std::string &cname)
virtual void readEOF() noexcept=0
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
virtual void errMessage(const cmsghdr &cmsg) noexcept=0
ThreadPoolListHook * addr
void shutdownWrite() override
size_t appBytesWritten_
Num of bytes written to socket.
Definition: AsyncSocket.h:1254
int setNoDelay(bool noDelay)
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
void adjustZeroCopyFlags(folly::WriteFlags &flags)
WriteCallback * getCallback() const
Definition: AsyncSocket.h:907
virtual void invokeConnectErr(const AsyncSocketException &ex)
bool tfo_succeeded(int)
int setQuickAck(bool quickack)
state
Definition: http_parser.c:272
ConnectCallback * connectCallback_
ConnectCallback.
Definition: AsyncSocket.h:1245
uint32_t partialBytes_
partial bytes of incomplete op written
uint32_t getZeroCopyBufId() const
Definition: AsyncSocket.h:508
int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept
void releaseZeroCopyBuf(uint32_t id)
std::unique_ptr< IOBuf > preReceivedData_
Definition: AsyncSocket.h:1258
virtual void readErr(const AsyncSocketException &ex) noexcept=0