proxygen
TestAsyncTransport Class Reference

#include <TestAsyncTransport.h>

Inheritance diagram for TestAsyncTransport:
folly::AsyncTransportWrapper folly::AsyncTimeout folly::AsyncTransport folly::AsyncReader folly::AsyncWriter folly::DelayedDestruction folly::AsyncSocketBase folly::DelayedDestructionBase

Classes

class  ReadEvent
 
class  WriteEvent
 

Public Member Functions

 TestAsyncTransport (folly::EventBase *eventBase)
 
void setReadCB (AsyncTransportWrapper::ReadCallback *callback) override
 
ReadCallbackgetReadCallback () const override
 
void write (AsyncTransportWrapper::WriteCallback *callback, const void *buf, size_t bytes, folly::WriteFlags flags=folly::WriteFlags::NONE) override
 
void writev (AsyncTransportWrapper::WriteCallback *callback, const struct iovec *vec, size_t count, folly::WriteFlags flags=folly::WriteFlags::NONE) override
 
void writeChain (AsyncTransportWrapper::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&iob, folly::WriteFlags flags=folly::WriteFlags::NONE) override
 
void close () override
 
void closeNow () override
 
void shutdownWrite () override
 
void shutdownWriteNow () override
 
void getPeerAddress (folly::SocketAddress *addr) const override
 
void getLocalAddress (folly::SocketAddress *addr) const override
 
bool good () const override
 
bool readable () const override
 
bool connecting () const override
 
bool error () const override
 
void attachEventBase (folly::EventBase *eventBase) override
 
void detachEventBase () override
 
bool isDetachable () const override
 
folly::EventBasegetEventBase () const override
 
void setSendTimeout (uint32_t milliseconds) override
 
uint32_t getSendTimeout () const override
 
void addReadEvent (const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
 
void addReadEvent (folly::IOBufQueue &chain, std::chrono::milliseconds delayFromPrevious)
 
void addReadEvent (const char *buf, std::chrono::milliseconds delayFromPrevious=std::chrono::milliseconds(0))
 
void addReadEOF (std::chrono::milliseconds delayFromPrevious)
 
void addReadError (const folly::AsyncSocketException &ex, std::chrono::milliseconds delayFromPrevious)
 
void startReadEvents ()
 
void pauseWrites ()
 
void resumeWrites ()
 
std::deque< std::shared_ptr< WriteEvent > > * getWriteEvents ()
 
uint32_t getEORCount ()
 
uint32_t getCorkCount ()
 
size_t getAppBytesWritten () const override
 
size_t getRawBytesWritten () const override
 
size_t getAppBytesReceived () const override
 
size_t getRawBytesReceived () const override
 
bool isEorTrackingEnabled () const override
 
void setEorTracking (bool) override
 
- Public Member Functions inherited from folly::AsyncTransportWrapper
void setReadCB (ReadCallback *callback) override=0
 
void write (WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override=0
 
void writev (WriteCallback *callback, const iovec *vec, size_t count, WriteFlags flags=WriteFlags::NONE) override=0
 
void writeChain (WriteCallback *callback, std::unique_ptr< IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override=0
 
virtual const AsyncTransportWrappergetWrappedTransport () const
 
template<class T >
const TgetUnderlyingTransport () const
 
template<class T >
TgetUnderlyingTransport ()
 
- Public Member Functions inherited from folly::AsyncTransport
virtual void closeWithReset ()
 
virtual bool writable () const
 
virtual bool isPending () const
 
SocketAddress getLocalAddress () const
 
void getAddress (SocketAddress *address) const override
 
SocketAddress getPeerAddress () const
 
virtual ssl::X509UniquePtr getPeerCert () const
 
virtual const X509 * getSelfCert () const
 
virtual const AsyncTransportCertificategetPeerCertificate () const
 
virtual const AsyncTransportCertificategetSelfCertificate () const
 
virtual std::string getApplicationProtocol () const noexcept
 
virtual std::string getSecurityProtocol () const
 
virtual bool isReplaySafe () const
 
virtual void setReplaySafetyCallback (ReplaySafetyCallback *callback)
 
- Public Member Functions inherited from folly::DelayedDestruction
virtual void destroy ()
 
bool getDestroyPending () const
 
- Public Member Functions inherited from folly::DelayedDestructionBase
virtual ~DelayedDestructionBase ()=default
 
- Public Member Functions inherited from folly::AsyncSocketBase
virtual ~AsyncSocketBase ()=default
 

Private Types

enum  StateEnum { kStateOpen, kStatePaused, kStateClosed, kStateError }
 
- Private Types inherited from folly::AsyncTimeout
typedef TimeoutManager::InternalEnum InternalEnum
 

Private Member Functions

bool writesAllowed () const
 
 TestAsyncTransport (TestAsyncTransport const &)
 
TestAsyncTransportoperator= (TestAsyncTransport const &)
 
void addReadEvent (const std::shared_ptr< ReadEvent > &event)
 
void scheduleNextReadEvent (proxygen::TimePoint now)
 
void fireNextReadEvent ()
 
void fireOneReadEvent ()
 
void failPendingWrites ()
 
void timeoutExpired () noexceptoverride
 
- Private Member Functions inherited from folly::AsyncTimeout
 AsyncTimeout (TimeoutManager *timeoutManager)
 
 AsyncTimeout (EventBase *eventBase)
 
 AsyncTimeout (TimeoutManager *timeoutManager, InternalEnum internal)
 
 AsyncTimeout (EventBase *eventBase, InternalEnum internal)
 
 AsyncTimeout ()
 
virtual ~AsyncTimeout ()
 
bool scheduleTimeout (uint32_t milliseconds)
 
bool scheduleTimeout (TimeoutManager::timeout_type timeout)
 
void cancelTimeout ()
 
bool isScheduled () const
 
void attachTimeoutManager (TimeoutManager *timeoutManager, InternalEnum internal=InternalEnum::NORMAL)
 
void attachEventBase (EventBase *eventBase, InternalEnum internal=InternalEnum::NORMAL)
 
void detachTimeoutManager ()
 
void detachEventBase ()
 
const TimeoutManagergetTimeoutManager ()
 
struct event * getEvent ()
 

Private Attributes

folly::EventBaseeventBase_
 
folly::AsyncTransportWrapper::ReadCallbackreadCallback_
 
uint32_t sendTimeout_
 
proxygen::TimePoint prevReadEventTime_ {}
 
proxygen::TimePoint nextReadEventTime_ {}
 
StateEnum readState_
 
StateEnum writeState_
 
std::deque< std::shared_ptr< ReadEvent > > readEvents_
 
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
 
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
 
uint32_t eorCount_ {0}
 
uint32_t corkCount_ {0}
 

Additional Inherited Members

- Public Types inherited from folly::AsyncTransportWrapper
using UniquePtr = std::unique_ptr< AsyncTransportWrapper, Destructor >
 
using ReadCallback = AsyncReader::ReadCallback
 
using WriteCallback = AsyncWriter::WriteCallback
 
- Public Types inherited from folly::AsyncTransport
typedef std::unique_ptr< AsyncTransport, DestructorUniquePtr
 
- Protected Member Functions inherited from folly::AsyncTransport
 ~AsyncTransport () override=default
 
- Protected Member Functions inherited from folly::DelayedDestruction
 ~DelayedDestruction () override=default
 
 DelayedDestruction ()
 
- Protected Member Functions inherited from folly::DelayedDestructionBase
 DelayedDestructionBase ()
 
uint32_t getDestructorGuardCount () const
 
- Protected Member Functions inherited from folly::AsyncReader
virtual ~AsyncReader ()=default
 
- Protected Member Functions inherited from folly::AsyncWriter
virtual ~AsyncWriter ()=default
 
- Static Private Member Functions inherited from folly::AsyncTimeout
template<typename TCallback >
static std::unique_ptr< AsyncTimeoutmake (TimeoutManager &manager, TCallback &&callback)
 
template<typename TCallback >
static std::unique_ptr< AsyncTimeoutschedule (TimeoutManager::timeout_type timeout, TimeoutManager &manager, TCallback &&callback)
 

Detailed Description

Definition at line 19 of file TestAsyncTransport.h.

Member Enumeration Documentation

Enumerator
kStateOpen 
kStatePaused 
kStateClosed 
kStateError 

Definition at line 121 of file TestAsyncTransport.h.

Constructor & Destructor Documentation

TestAsyncTransport::TestAsyncTransport ( folly::EventBase eventBase)
explicit

Definition at line 168 of file TestAsyncTransport.cpp.

Referenced by writesAllowed().

169  : AsyncTimeout(eventBase),
170  eventBase_(eventBase),
171  readCallback_(nullptr),
172  sendTimeout_(0),
175  readEvents_()
176 {
177 }
folly::EventBase * eventBase_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
folly::AsyncTransportWrapper::ReadCallback * readCallback_
TestAsyncTransport::TestAsyncTransport ( TestAsyncTransport const &  )
private

Member Function Documentation

void TestAsyncTransport::addReadEOF ( std::chrono::milliseconds  delayFromPrevious)

Definition at line 455 of file TestAsyncTransport.cpp.

References addReadEvent().

455  {
456  addReadEvent(nullptr, 0, delayFromPrevious);
457 }
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
void TestAsyncTransport::addReadError ( const folly::AsyncSocketException ex,
std::chrono::milliseconds  delayFromPrevious 
)

Definition at line 460 of file TestAsyncTransport.cpp.

References addReadEvent(), and readEvents_.

461  {
462  if (!readEvents_.empty() && readEvents_.back()->isFinalEvent()) {
463  LOG(FATAL) << "cannot add a read error after an error or EOF";
464  }
465 
466  auto event = std::make_shared<ReadEvent>(ex, delayFromPrevious);
467  addReadEvent(event);
468 }
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
std::deque< std::shared_ptr< ReadEvent > > readEvents_
void TestAsyncTransport::addReadEvent ( const void *  buf,
size_t  buflen,
std::chrono::milliseconds  delayFromPrevious 
)

Definition at line 438 of file TestAsyncTransport.cpp.

References readEvents_.

Referenced by addReadEOF(), addReadError(), addReadEvent(), and writesAllowed().

439  {
440  if (!readEvents_.empty() && readEvents_.back()->isFinalEvent()) {
441  LOG(FATAL) << "cannot add more read events after an error or EOF";
442  }
443 
444  auto event = std::make_shared<ReadEvent>(buf, buflen, delayFromPrevious);
445  addReadEvent(event);
446 }
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
std::deque< std::shared_ptr< ReadEvent > > readEvents_
void TestAsyncTransport::addReadEvent ( folly::IOBufQueue chain,
std::chrono::milliseconds  delayFromPrevious 
)

Definition at line 426 of file TestAsyncTransport.cpp.

References addReadEvent(), folly::IOBuf::data(), folly::IOBuf::length(), and folly::IOBufQueue::pop_front().

427  {
428  while (true) {
429  unique_ptr<IOBuf> cur = chain.pop_front();
430  if (!cur) {
431  break;
432  }
433  addReadEvent(cur->data(), cur->length(), delayFromPrevious);
434  }
435 }
const uint8_t * data() const
Definition: IOBuf.h:499
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
std::size_t length() const
Definition: IOBuf.h:533
std::unique_ptr< folly::IOBuf > pop_front()
Definition: IOBufQueue.cpp:316
void TestAsyncTransport::addReadEvent ( const char *  buf,
std::chrono::milliseconds  delayFromPrevious = std::chrono::milliseconds(0) 
)

Definition at line 449 of file TestAsyncTransport.cpp.

References addReadEvent().

450  {
451  addReadEvent(buf, strlen(buf), delayFromPrevious);
452 }
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
void TestAsyncTransport::addReadEvent ( const std::shared_ptr< ReadEvent > &  event)
private

Definition at line 471 of file TestAsyncTransport.cpp.

References proxygen::getCurrentTime(), nextReadEventTime_, prevReadEventTime_, readCallback_, readEvents_, scheduleNextReadEvent(), and proxygen::timePointInitialized().

471  {
472  bool firstEvent = readEvents_.empty();
473  readEvents_.push_back(event);
474 
475  if (!firstEvent) {
476  return;
477  }
479  return;
480  }
481 
482  nextReadEventTime_ = prevReadEventTime_ + event->getDelay();
483  if (readCallback_ == nullptr) {
484  return;
485  }
486 
488 }
proxygen::TimePoint nextReadEventTime_
proxygen::TimePoint prevReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
folly::AsyncTransportWrapper::ReadCallback * readCallback_
bool timePointInitialized(const T &time)
Definition: Time.h:35
void scheduleNextReadEvent(proxygen::TimePoint now)
void TestAsyncTransport::attachEventBase ( folly::EventBase eventBase)
overridevirtual

Attach the transport to a EventBase.

This may only be called if the transport is not currently attached to a EventBase (by an earlier call to detachEventBase()).

This method must be invoked in the EventBase's thread.

Implements folly::AsyncTransport.

Definition at line 351 of file TestAsyncTransport.cpp.

References eventBase_, and readCallback_.

351  {
352  CHECK(nullptr == eventBase_);
353  CHECK(nullptr == readCallback_);
354  eventBase_ = eventBase;
355 }
folly::EventBase * eventBase_
folly::AsyncTransportWrapper::ReadCallback * readCallback_
void TestAsyncTransport::close ( )
overridevirtual

Close the transport.

This gracefully closes the transport, waiting for all pending write requests to complete before actually closing the underlying transport.

If a read callback is set, readEOF() will be called immediately. If there are outstanding write requests, the close will be delayed until all remaining writes have completed. No new writes may be started after close() has been called.

Implements folly::AsyncTransport.

Definition at line 285 of file TestAsyncTransport.cpp.

References closeNow().

285  {
286  closeNow();
287 }
void closeNow() override
void TestAsyncTransport::closeNow ( )
overridevirtual

Close the transport immediately.

This closes the transport immediately, dropping any outstanding data waiting to be written.

If a read callback is set, readEOF() will be called immediately. If there are outstanding write requests, these requests will be aborted and writeError() will be invoked immediately on all outstanding write callbacks.

Implements folly::AsyncTransport.

Definition at line 290 of file TestAsyncTransport.cpp.

References kStateClosed, kStateOpen, readCallback_, folly::AsyncReader::ReadCallback::readEOF(), readState_, and shutdownWriteNow().

Referenced by close().

290  {
291  if (readState_ == kStateOpen) {
293 
294  if (readCallback_ != nullptr) {
296  readCallback_ = nullptr;
297  callback->readEOF();
298  }
299  }
301 }
void shutdownWriteNow() override
folly::AsyncTransportWrapper::ReadCallback * readCallback_
virtual void readEOF() noexcept=0
bool TestAsyncTransport::connecting ( ) const
overridevirtual

Determine if transport is connected to the endpoint

Returns
false iff the transport is connected, otherwise true

Implements folly::AsyncTransport.

Definition at line 341 of file TestAsyncTransport.cpp.

341  {
342  return false;
343 }
void TestAsyncTransport::detachEventBase ( )
overridevirtual

Detach the transport from its EventBase.

This may only be called when the transport is idle and has no reads or writes pending. Once detached, the transport may not be used again until it is re-attached to a EventBase by calling attachEventBase().

This method must be called from the current EventBase's thread.

Implements folly::AsyncTransport.

Definition at line 358 of file TestAsyncTransport.cpp.

References eventBase_, and readCallback_.

358  {
359  CHECK_NOTNULL(eventBase_);
360  CHECK(nullptr == readCallback_);
361  eventBase_ = nullptr;
362 }
folly::EventBase * eventBase_
folly::AsyncTransportWrapper::ReadCallback * readCallback_
bool TestAsyncTransport::error ( ) const
overridevirtual

Determine if an error has occurred with this transport.

Returns
true iff an error has occurred (not EOF).

Implements folly::AsyncTransport.

Definition at line 346 of file TestAsyncTransport.cpp.

References kStateError, readState_, and writeState_.

void TestAsyncTransport::failPendingWrites ( )
private

Definition at line 410 of file TestAsyncTransport.cpp.

References g(), pendingWriteEvents_, and folly::AsyncWriter::WriteCallback::writeErr().

Referenced by shutdownWriteNow(), and writesAllowed().

410  {
411  // writeError() callback might try to delete this object
412  DestructorGuard g(this);
413  while (!pendingWriteEvents_.empty()) {
414  auto event = pendingWriteEvents_.front();
415  pendingWriteEvents_.pop_front();
416  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
417  "Transport closed locally");
418  auto cb = dynamic_cast<WriteCallback*>(event.second);
419  if (cb) {
420  cb->writeErr(0, ex);
421  }
422  }
423 }
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
g_t g(f_t)
void writeErr(size_t nBytesWritten, const folly::AsyncSocketException &ex) noexceptoverride
void TestAsyncTransport::fireNextReadEvent ( )
private

Definition at line 518 of file TestAsyncTransport.cpp.

References folly::netops::bind(), eventBase_, fireOneReadEvent(), proxygen::getCurrentTime(), nextReadEventTime_, now(), readCallback_, readEvents_, folly::EventBase::runInLoop(), folly::AsyncTimeout::scheduleTimeout(), and proxygen::timePointInitialized().

Referenced by scheduleNextReadEvent(), timeoutExpired(), and writesAllowed().

518  {
519  DestructorGuard dg(this);
520  CHECK(!readEvents_.empty());
521  CHECK_NOTNULL(readCallback_);
522 
523  // maxReadAtOnce prevents us from starving other users of this EventBase
524  unsigned int const maxReadAtOnce = 30;
525  for (unsigned int n = 0; n < maxReadAtOnce; ++n) {
527 
528  if (readCallback_ == nullptr || eventBase_ == nullptr ||
530  return;
531  }
532  auto now = proxygen::getCurrentTime();
533  if (nextReadEventTime_ > now) {
534  scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>
535  (nextReadEventTime_ - now));
536  return;
537  }
538  }
539 
540  // Trigger fireNextReadEvent() to be called the next time around the event
541  // loop.
543  this));
544 }
folly::EventBase * eventBase_
std::chrono::steady_clock::time_point now()
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
proxygen::TimePoint nextReadEventTime_
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
std::deque< std::shared_ptr< ReadEvent > > readEvents_
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
bool scheduleTimeout(uint32_t milliseconds)
folly::AsyncTransportWrapper::ReadCallback * readCallback_
bool timePointInitialized(const T &time)
Definition: Time.h:35
void TestAsyncTransport::fireOneReadEvent ( )
private

Definition at line 547 of file TestAsyncTransport.cpp.

References folly::AsyncReader::ReadCallback::getReadBuffer(), kStateClosed, kStateError, nextReadEventTime_, prevReadEventTime_, readCallback_, folly::AsyncReader::ReadCallback::readDataAvailable(), folly::AsyncReader::ReadCallback::readEOF(), folly::AsyncReader::ReadCallback::readErr(), readEvents_, readState_, and writeState_.

Referenced by fireNextReadEvent(), and writesAllowed().

547  {
548  CHECK(!readEvents_.empty());
549  CHECK_NOTNULL(readCallback_);
550 
551  const shared_ptr<ReadEvent>& event = readEvents_.front();
552 
553  // Note that we call getReadBuffer() here even if we know the next event may
554  // be an EOF or an error. This matches the behavior of AsyncSocket.
555  // (Because AsyncSocket merely gets notification that the socket is readable,
556  // and has to call getReadBuffer() before it can make the actual read call to
557  // get an error or EOF.)
558  void* buf;
559  size_t buflen;
560  try {
561  readCallback_->getReadBuffer(&buf, &buflen);
562  } catch (...) {
563  // TODO: we should convert the error to a AsyncSocketException and call
564  // readError() here.
565  LOG(FATAL) << "readCallback_->getReadBuffer() threw an error";
566  }
567  if (buf == nullptr || buflen == 0) {
568  // TODO: we should just call readError() here.
569  LOG(FATAL) << "readCallback_->getReadBuffer() returned a nullptr or "
570  "empty buffer";
571  }
572 
573  // Handle errors
574  if (event->isError()) {
575  // Errors immediately move both read and write to an error state
578 
579  // event is just a reference to the shared_ptr, so make a real copy of the
580  // pointer before popping it off the readEvents_ list.
581  shared_ptr<ReadEvent> eventPointerCopy = readEvents_.front();
582  readEvents_.pop_front();
583  CHECK(readEvents_.empty());
584  nextReadEventTime_ = {};
585 
586  auto callback = readCallback_;
587  readCallback_ = nullptr;
588  callback->readErr(eventPointerCopy->getException());
589  return;
590  }
591 
592  // Handle EOF
593  size_t available = event->getLength();
594  if (available == 0) {
596 
597  readEvents_.pop_front();
598  CHECK(readEvents_.empty());
599  nextReadEventTime_ = {};
600 
601  auto callback = readCallback_;
602  readCallback_ = nullptr;
603  callback->readEOF();
604  return;
605  }
606 
607  // Handle a normal read event
608  size_t readlen;
609  bool more;
610  if (available <= buflen) {
611  readlen = available;
612  more = false;
613  } else {
614  readlen = buflen;
615  more = true;
616  }
617  memcpy(buf, event->getBuffer(), readlen);
618  if (more) {
619  event->consumeData(readlen);
620  } else {
622  // Note: since event is just a reference to the shared_ptr in readEvents_,
623  // we shouldn't access the event any more after popping it off here.
624  readEvents_.pop_front();
625 
626  if (readEvents_.empty()) {
627  nextReadEventTime_ = {};
628  } else {
629  nextReadEventTime_ = prevReadEventTime_ + readEvents_.front()->getDelay();
630  }
631  }
633 }
virtual void readDataAvailable(size_t len) noexcept=0
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
proxygen::TimePoint nextReadEventTime_
proxygen::TimePoint prevReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
folly::AsyncTransportWrapper::ReadCallback * readCallback_
virtual void readEOF() noexcept=0
virtual void readErr(const AsyncSocketException &ex) noexcept=0
size_t TestAsyncTransport::getAppBytesReceived ( ) const
inlineoverridevirtual

Implements folly::AsyncTransport.

Definition at line 115 of file TestAsyncTransport.h.

115 { return 0; }
size_t TestAsyncTransport::getAppBytesWritten ( ) const
inlineoverridevirtual

Implements folly::AsyncTransport.

Definition at line 113 of file TestAsyncTransport.h.

113 { return 0; }
uint32_t TestAsyncTransport::getCorkCount ( )
inline

Definition at line 109 of file TestAsyncTransport.h.

References corkCount_.

109  {
110  return corkCount_;
111  }
uint32_t TestAsyncTransport::getEORCount ( )
inline

Definition at line 105 of file TestAsyncTransport.h.

References eorCount_.

105  {
106  return eorCount_;
107  }
EventBase * TestAsyncTransport::getEventBase ( ) const
overridevirtual

Implements folly::AsyncSocketBase.

Definition at line 370 of file TestAsyncTransport.cpp.

References eventBase_.

370  {
371  return eventBase_;
372 }
folly::EventBase * eventBase_
void TestAsyncTransport::getLocalAddress ( folly::SocketAddress address) const
overridevirtual

Get the address of the local endpoint of this transport.

This function may throw AsyncSocketException on error.

Parameters
addressThe local address will be stored in the specified SocketAddress.

Implements folly::AsyncTransport.

Definition at line 325 of file TestAsyncTransport.cpp.

References folly::SocketAddress::setFromIpPort().

325  {
326  // This isn't really accurate, but close enough for testing.
327  addr->setFromIpPort("127.0.0.1", 0);
328 }
ThreadPoolListHook * addr
void TestAsyncTransport::getPeerAddress ( folly::SocketAddress address) const
overridevirtual

Get the address of the remote endpoint to which this transport is connected.

This function may throw AsyncSocketException on error.

Parameters
addressThe remote endpoint's address will be stored in the specified SocketAddress.

Implements folly::AsyncTransport.

Definition at line 319 of file TestAsyncTransport.cpp.

References folly::SocketAddress::setFromIpPort().

319  {
320  // This isn't really accurate, but close enough for testing.
321  addr->setFromIpPort("127.0.0.1", 0);
322 }
ThreadPoolListHook * addr
size_t TestAsyncTransport::getRawBytesReceived ( ) const
inlineoverridevirtual

Implements folly::AsyncTransport.

Definition at line 116 of file TestAsyncTransport.h.

116 { return 0; }
size_t TestAsyncTransport::getRawBytesWritten ( ) const
inlineoverridevirtual

Implements folly::AsyncTransport.

Definition at line 114 of file TestAsyncTransport.h.

114 { return 0; }
TestAsyncTransport::ReadCallback * TestAsyncTransport::getReadCallback ( ) const
overridevirtual

Implements folly::AsyncTransportWrapper.

Definition at line 225 of file TestAsyncTransport.cpp.

References readCallback_.

225  {
226  return dynamic_cast<TestAsyncTransport::ReadCallback*>(readCallback_);
227 }
folly::AsyncTransportWrapper::ReadCallback * readCallback_
uint32_t TestAsyncTransport::getSendTimeout ( ) const
overridevirtual

Get the send timeout.

Returns
Returns the current send timeout, in milliseconds. A return value of 0 indicates that no timeout is set.

Implements folly::AsyncTransport.

Definition at line 380 of file TestAsyncTransport.cpp.

References sendTimeout_.

380  {
381  return sendTimeout_;
382 }
std::deque< std::shared_ptr<WriteEvent> >* TestAsyncTransport::getWriteEvents ( )
inline

Definition at line 101 of file TestAsyncTransport.h.

References writeEvents_.

101  {
102  return &writeEvents_;
103  }
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
bool TestAsyncTransport::good ( ) const
overridevirtual

Determine if transport is open and ready to read or write.

Note that this function returns false on EOF; you must also call error() to distinguish between an EOF and an error.

Returns
true iff the transport is open and ready, false otherwise.

Implements folly::AsyncTransport.

Definition at line 331 of file TestAsyncTransport.cpp.

References kStateOpen, readState_, and writesAllowed().

bool TestAsyncTransport::isDetachable ( ) const
overridevirtual

Determine if the transport can be detached.

This method must be called from the current EventBase's thread.

Implements folly::AsyncTransport.

Definition at line 365 of file TestAsyncTransport.cpp.

365  {
366  return true;
367 }
bool TestAsyncTransport::isEorTrackingEnabled ( ) const
inlineoverridevirtual
Returns
True iff end of record tracking is enabled

Implements folly::AsyncTransport.

Definition at line 117 of file TestAsyncTransport.h.

117 { return false; }
TestAsyncTransport& TestAsyncTransport::operator= ( TestAsyncTransport const &  )
private

Referenced by writesAllowed().

void TestAsyncTransport::pauseWrites ( )

Definition at line 385 of file TestAsyncTransport.cpp.

References kStateOpen, kStatePaused, and writeState_.

385  {
386  if (writeState_ != kStateOpen) {
387  LOG(FATAL) << "cannot pause writes on non-open transport; state=" <<
388  writeState_;
389  }
391 }
bool TestAsyncTransport::readable ( ) const
overridevirtual

Determine if the transport is readable or not.

Returns
true iff the transport is readable, false otherwise.

Implements folly::AsyncTransport.

Definition at line 336 of file TestAsyncTransport.cpp.

336  {
337  return false;
338 }
void TestAsyncTransport::resumeWrites ( )

Definition at line 394 of file TestAsyncTransport.cpp.

References kStateOpen, kStatePaused, pendingWriteEvents_, writeEvents_, and writeState_.

394  {
395  if (writeState_ != kStatePaused) {
396  LOG(FATAL) << "cannot resume writes on non-paused transport; state=" <<
397  writeState_;
398  }
400  for (auto event = pendingWriteEvents_.begin();
401  event != pendingWriteEvents_.end() && writeState_ == kStateOpen;
402  event = pendingWriteEvents_.begin()) {
403  writeEvents_.push_back(event->first);
404  pendingWriteEvents_.pop_front();
405  event->second->writeSuccess();
406  }
407 }
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
void TestAsyncTransport::scheduleNextReadEvent ( proxygen::TimePoint  now)
private

Definition at line 508 of file TestAsyncTransport.cpp.

References fireNextReadEvent(), nextReadEventTime_, and folly::AsyncTimeout::scheduleTimeout().

Referenced by addReadEvent(), setReadCB(), startReadEvents(), and writesAllowed().

508  {
509  if (nextReadEventTime_ <= now) {
511  } else {
512  scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>
513  (nextReadEventTime_ - now));
514  }
515 }
std::chrono::steady_clock::time_point now()
proxygen::TimePoint nextReadEventTime_
bool scheduleTimeout(uint32_t milliseconds)
void TestAsyncTransport::setEorTracking ( bool  )
inlineoverridevirtual

Implements folly::AsyncTransport.

Definition at line 118 of file TestAsyncTransport.h.

118 { return; }
void TestAsyncTransport::setReadCB ( AsyncTransportWrapper::ReadCallback *  callback)
override

Definition at line 180 of file TestAsyncTransport.cpp.

References folly::AsyncTimeout::cancelTimeout(), proxygen::getCurrentTime(), kStateClosed, kStateError, kStateOpen, nextReadEventTime_, folly::AsyncSocketException::NOT_OPEN, readCallback_, folly::AsyncReader::ReadCallback::readEOF(), readEvents_, readState_, scheduleNextReadEvent(), and proxygen::timePointInitialized().

180  {
181  if (readCallback_ == callback) {
182  return;
183  }
184 
185  if (callback == nullptr) {
186  cancelTimeout();
187  readCallback_ = nullptr;
188  return;
189  }
190 
191  bool wasNull = (readCallback_ == nullptr);
192 
193  if (readState_ == kStateClosed) {
194  callback->readEOF();
195  return;
196  } else if (readState_ == kStateError) {
198  "setReadCB() called with socket in "
199  "invalid state");
200  callback->readErr(ex);
201  return;
202  }
203 
204  CHECK_EQ(readState_, kStateOpen);
205  readCallback_ = callback;
206 
207  // If the callback was previously nullptr, read events were paused, so we need
208  // to reschedule them now.
209  //
210  // If it was set before, read events are still scheduled, so we are done now
211  // and can return.
212  if (!wasNull) {
213  return;
214  }
215 
217  // Either readEvents_ is empty, or startReadEvents() hasn't been called yet
218  return;
219  }
220  CHECK(!readEvents_.empty());
222 }
proxygen::TimePoint nextReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
folly::AsyncTransportWrapper::ReadCallback * readCallback_
virtual void readEOF() noexcept=0
bool timePointInitialized(const T &time)
Definition: Time.h:35
void scheduleNextReadEvent(proxygen::TimePoint now)
void TestAsyncTransport::setSendTimeout ( uint32_t  milliseconds)
overridevirtual

Set the send timeout.

If write requests do not make any progress for more than the specified number of milliseconds, fail all pending writes and close the transport.

If write requests are currently pending when setSendTimeout() is called, the timeout interval is immediately restarted using the new value.

Parameters
millisecondsThe timeout duration, in milliseconds. If 0, no timeout will be used.

Implements folly::AsyncTransport.

Definition at line 375 of file TestAsyncTransport.cpp.

References sendTimeout_, and uint32_t.

375  {
376  sendTimeout_ = milliseconds;
377 }
void TestAsyncTransport::shutdownWrite ( )
overridevirtual

Perform a half-shutdown of the write side of the transport.

The caller should not make any more calls to write() or writev() after shutdownWrite() is called. Any future write attempts will fail immediately.

Not all transport types support half-shutdown. If the underlying transport does not support half-shutdown, it will fully shutdown both the read and write sides of the transport. (Fully shutting down the socket is better than doing nothing at all, since the caller may rely on the shutdownWrite() call to notify the other end of the connection that no more data can be read.)

If there is pending data still waiting to be written on the transport, the actual shutdown will be delayed until the pending data has been written.

Note: There is no corresponding shutdownRead() equivalent. Simply uninstall the read callback if you wish to stop reading. (On TCP sockets at least, shutting down the read side of the socket is a no-op anyway.)

Implements folly::AsyncTransport.

Definition at line 304 of file TestAsyncTransport.cpp.

References shutdownWriteNow().

304  {
306 }
void shutdownWriteNow() override
void TestAsyncTransport::shutdownWriteNow ( )
overridevirtual

Perform a half-shutdown of the write side of the transport.

shutdownWriteNow() is identical to shutdownWrite(), except that it immediately performs the shutdown, rather than waiting for pending writes to complete. Any pending write requests will be immediately failed when shutdownWriteNow() is called.

Implements folly::AsyncTransport.

Definition at line 309 of file TestAsyncTransport.cpp.

References failPendingWrites(), g(), kStateClosed, kStateOpen, kStatePaused, and writeState_.

Referenced by closeNow(), and shutdownWrite().

void TestAsyncTransport::startReadEvents ( )

Definition at line 491 of file TestAsyncTransport.cpp.

References proxygen::getCurrentTime(), nextReadEventTime_, now(), prevReadEventTime_, readCallback_, readEvents_, and scheduleNextReadEvent().

491  {
492  auto now = proxygen::getCurrentTime();
494 
495  if (readEvents_.empty()) {
496  return;
497  }
498  nextReadEventTime_ = prevReadEventTime_ + readEvents_.front()->getDelay();
499 
500  if (readCallback_ == nullptr) {
501  return;
502  }
503 
505 }
std::chrono::steady_clock::time_point now()
proxygen::TimePoint nextReadEventTime_
proxygen::TimePoint prevReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
folly::AsyncTransportWrapper::ReadCallback * readCallback_
void scheduleNextReadEvent(proxygen::TimePoint now)
void TestAsyncTransport::timeoutExpired ( )
overrideprivatevirtualnoexcept

timeoutExpired() is invoked when the timeout period has expired.

Implements folly::AsyncTimeout.

Definition at line 636 of file TestAsyncTransport.cpp.

References fireNextReadEvent(), readCallback_, and readEvents_.

Referenced by writesAllowed().

636  {
637  CHECK_NOTNULL(readCallback_);
638  CHECK(!readEvents_.empty());
640 }
std::deque< std::shared_ptr< ReadEvent > > readEvents_
folly::AsyncTransportWrapper::ReadCallback * readCallback_
void TestAsyncTransport::write ( AsyncTransportWrapper::WriteCallback *  callback,
const void *  buf,
size_t  bytes,
folly::WriteFlags  flags = folly::WriteFlags::NONE 
)
override

Definition at line 230 of file TestAsyncTransport.cpp.

References writev().

232  {
233  iovec op;
234  op.iov_base = const_cast<void*>(buf);
235  op.iov_len = bytes;
236  this->writev(callback, &op, 1, flags);
237 }
flags
Definition: http_parser.h:127
void writev(AsyncTransportWrapper::WriteCallback *callback, const struct iovec *vec, size_t count, folly::WriteFlags flags=folly::WriteFlags::NONE) override
void TestAsyncTransport::writeChain ( AsyncTransportWrapper::WriteCallback *  callback,
std::unique_ptr< folly::IOBuf > &&  iob,
folly::WriteFlags  flags = folly::WriteFlags::NONE 
)
override

Definition at line 268 of file TestAsyncTransport.cpp.

References count, folly::IOBuf::data(), i, folly::IOBuf::length(), cpp.ast::next(), folly::IOBuf::next(), uint8_t, and writev().

270  {
271  size_t count = iob->countChainElements();
272  iovec vec[count];
273  const IOBuf* head = iob.get();
274  const IOBuf* next = head;
275  unsigned i = 0;
276  do {
277  vec[i].iov_base = const_cast<uint8_t *>(next->data());
278  vec[i++].iov_len = next->length();
279  next = next->next();
280  } while (next != head);
281  this->writev(callback, vec, count, flags);
282 }
flags
Definition: http_parser.h:127
const uint8_t * data() const
Definition: IOBuf.h:499
size_t countChainElements() const
Definition: IOBuf.cpp:493
std::size_t length() const
Definition: IOBuf.h:533
Definition: Traits.h:588
IOBuf * next()
Definition: IOBuf.h:600
int * count
void writev(AsyncTransportWrapper::WriteCallback *callback, const struct iovec *vec, size_t count, folly::WriteFlags flags=folly::WriteFlags::NONE) override
def next(obj)
Definition: ast.py:58
void TestAsyncTransport::writev ( AsyncTransportWrapper::WriteCallback *  callback,
const struct iovec *  vec,
size_t  count,
folly::WriteFlags  flags = folly::WriteFlags::NONE 
)
override

Definition at line 240 of file TestAsyncTransport.cpp.

References corkCount_, eorCount_, folly::isSet(), kStateOpen, kStatePaused, TestAsyncTransport::WriteEvent::newEvent(), pendingWriteEvents_, writeEvents_, writesAllowed(), and writeState_.

Referenced by write(), and writeChain().

242  {
243  if (isSet(flags, WriteFlags::CORK)) {
244  corkCount_++;
245  } else if (isSet(flags, WriteFlags::EOR)) {
246  eorCount_++;
247  }
248  if (!writesAllowed()) {
249  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
250  "write() called on non-open TestAsyncTransport");
251  auto cb = dynamic_cast<WriteCallback*>(callback);
252  DCHECK(cb);
253  cb->writeErr(0, ex);
254  return;
255  }
256 
257  shared_ptr<WriteEvent> event = WriteEvent::newEvent(vec, count);
258  if (writeState_ == kStatePaused || pendingWriteEvents_.size() > 0) {
259  pendingWriteEvents_.push_back(std::make_pair(event, callback));
260  } else {
261  CHECK_EQ(writeState_, kStateOpen);
262  writeEvents_.push_back(event);
263  callback->writeSuccess();
264  }
265 }
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
flags
Definition: http_parser.h:127
bool isSet(WriteFlags a, WriteFlags b)
static std::shared_ptr< WriteEvent > newEvent(const struct iovec *vec, size_t count)
bool writesAllowed() const
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
Definition: Traits.h:588
int * count

Member Data Documentation

uint32_t TestAsyncTransport::corkCount_ {0}
private

Definition at line 160 of file TestAsyncTransport.h.

Referenced by getCorkCount(), and writev().

uint32_t TestAsyncTransport::eorCount_ {0}
private

Definition at line 159 of file TestAsyncTransport.h.

Referenced by getEORCount(), and writev().

folly::EventBase* TestAsyncTransport::eventBase_
private
proxygen::TimePoint TestAsyncTransport::nextReadEventTime_ {}
private
std::deque< std::pair<std::shared_ptr<WriteEvent>, AsyncTransportWrapper::WriteCallback*> > TestAsyncTransport::pendingWriteEvents_
private

Definition at line 157 of file TestAsyncTransport.h.

Referenced by failPendingWrites(), resumeWrites(), and writev().

proxygen::TimePoint TestAsyncTransport::prevReadEventTime_ {}
private

Definition at line 150 of file TestAsyncTransport.h.

Referenced by addReadEvent(), fireOneReadEvent(), and startReadEvents().

std::deque< std::shared_ptr<ReadEvent> > TestAsyncTransport::readEvents_
private
StateEnum TestAsyncTransport::readState_
private

Definition at line 152 of file TestAsyncTransport.h.

Referenced by closeNow(), error(), fireOneReadEvent(), good(), and setReadCB().

uint32_t TestAsyncTransport::sendTimeout_
private

Definition at line 148 of file TestAsyncTransport.h.

Referenced by getSendTimeout(), and setSendTimeout().

std::deque< std::shared_ptr<WriteEvent> > TestAsyncTransport::writeEvents_
private

Definition at line 155 of file TestAsyncTransport.h.

Referenced by getWriteEvents(), resumeWrites(), and writev().

StateEnum TestAsyncTransport::writeState_
private

The documentation for this class was generated from the following files: