proxygen
TestAsyncTransport.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 #include <folly/SocketAddress.h>
13 #include <folly/io/IOBuf.h>
16 
17 using folly::WriteFlags;
20 using folly::EventBase;
21 using folly::IOBuf;
24 using std::shared_ptr;
25 using std::unique_ptr;
26 
27 /*
28  * TestAsyncTransport::ReadEvent
29  */
30 
32  public:
33  ReadEvent(const void* buf, size_t buflen, std::chrono::milliseconds delay)
34  : buffer_(nullptr),
37  isError_(false),
39  delay_(delay) {
40  if (buflen == 0) {
41  // This means EOF
42  return;
43  }
44  CHECK_NOTNULL(buf);
45 
46  buffer_ = static_cast<char*>(malloc(buflen));
47  if (buffer_ == nullptr) {
48  throw std::bad_alloc();
49  }
50  memcpy(buffer_, buf, buflen);
52  dataEnd_ = buffer_ + buflen;
53  }
54 
55  ReadEvent(const folly::AsyncSocketException& ex, std::chrono::milliseconds delay)
56  : buffer_(nullptr),
59  isError_(true),
60  exception_(ex),
61  delay_(delay) {}
62 
64  free(buffer_);
65  }
66 
67  std::chrono::milliseconds getDelay() const {
68  return delay_;
69  }
70 
71  bool isFinalEvent() const {
72  return buffer_ == nullptr;
73  }
74 
75  const char* getBuffer() const {
76  return readStart_;
77  }
78 
79  size_t getLength() const {
80  return (dataEnd_ - readStart_);
81  }
82 
83  void consumeData(size_t length) {
84  CHECK_LE(readStart_ + length, dataEnd_);
85  readStart_ += length;
86  }
87 
88  bool isError() const {
89  return isError_;
90  }
91 
93  return exception_;
94  }
95 
96  private:
97  char* buffer_;
98  char* readStart_;
99  char* dataEnd_;
100 
101  bool isError_;
103 
104  std::chrono::milliseconds delay_;
105 };
106 
107 /*
108  * TestAsyncTransport::WriteEvent methods
109  */
110 
112  : time_(time),
113  count_(count) {
114  // Initialize all of the iov_base pointers to nullptr. This way we won't free
115  // an uninitialized pointer in the destructor if we fail to allocate any of
116  // the buffers.
117  for (size_t n = 0; n < count_; ++n) {
118  vec_[n].iov_base = nullptr;
119  }
120 }
121 
123  for (size_t n = 0; n < count_; ++n) {
124  free(vec_[n].iov_base);
125  }
126 }
127 
128 shared_ptr<TestAsyncTransport::WriteEvent>
130  size_t count) {
131  size_t bufLen = sizeof(WriteEvent) + (count * sizeof(struct iovec));
132  void* buf = malloc(bufLen);
133  if (buf == nullptr) {
134  throw std::bad_alloc();
135  }
136 
137  auto now = proxygen::getCurrentTime();
138  shared_ptr<WriteEvent> event(new(buf) WriteEvent(now, count),
139  destroyEvent);
140  for (size_t n = 0; n < count; ++n) {
141  size_t len = vec[n].iov_len;
142  event->vec_[n].iov_len = len;
143  if (len == 0) {
144  event->vec_[n].iov_base = nullptr;
145  continue;
146  }
147 
148  event->vec_[n].iov_base = malloc(len);
149  if (event->vec_[n].iov_base == nullptr) {
150  throw std::bad_alloc();
151  }
152  memcpy(event->vec_[n].iov_base, vec[n].iov_base, len);
153  }
154 
155  return event;
156 }
157 
158 void
160  event->~WriteEvent();
161  free(event);
162 }
163 
164 /*
165  * TestAsyncTransport methods
166  */
167 
169  : AsyncTimeout(eventBase),
170  eventBase_(eventBase),
172  sendTimeout_(0),
175  readEvents_()
176 {
177 }
178 
179 void
180 TestAsyncTransport::setReadCB(AsyncTransportWrapper::ReadCallback* callback) {
181  if (readCallback_ == callback) {
182  return;
183  }
184 
185  if (callback == nullptr) {
186  cancelTimeout();
187  readCallback_ = nullptr;
188  return;
189  }
190 
191  bool wasNull = (readCallback_ == nullptr);
192 
193  if (readState_ == kStateClosed) {
194  callback->readEOF();
195  return;
196  } else if (readState_ == kStateError) {
198  "setReadCB() called with socket in "
199  "invalid state");
200  callback->readErr(ex);
201  return;
202  }
203 
204  CHECK_EQ(readState_, kStateOpen);
205  readCallback_ = callback;
206 
207  // If the callback was previously nullptr, read events were paused, so we need
208  // to reschedule them now.
209  //
210  // If it was set before, read events are still scheduled, so we are done now
211  // and can return.
212  if (!wasNull) {
213  return;
214  }
215 
217  // Either readEvents_ is empty, or startReadEvents() hasn't been called yet
218  return;
219  }
220  CHECK(!readEvents_.empty());
222 }
223 
226  return dynamic_cast<TestAsyncTransport::ReadCallback*>(readCallback_);
227 }
228 
229 void
230 TestAsyncTransport::write(AsyncTransportWrapper::WriteCallback* callback,
231  const void* buf, size_t bytes,
232  WriteFlags flags) {
233  iovec op;
234  op.iov_base = const_cast<void*>(buf);
235  op.iov_len = bytes;
236  this->writev(callback, &op, 1, flags);
237 }
238 
239 void
240 TestAsyncTransport::writev(AsyncTransportWrapper::WriteCallback* callback,
241  const iovec* vec, size_t count,
242  WriteFlags flags) {
243  if (isSet(flags, WriteFlags::CORK)) {
244  corkCount_++;
245  } else if (isSet(flags, WriteFlags::EOR)) {
246  eorCount_++;
247  }
248  if (!writesAllowed()) {
249  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
250  "write() called on non-open TestAsyncTransport");
251  auto cb = dynamic_cast<WriteCallback*>(callback);
252  DCHECK(cb);
253  cb->writeErr(0, ex);
254  return;
255  }
256 
257  shared_ptr<WriteEvent> event = WriteEvent::newEvent(vec, count);
258  if (writeState_ == kStatePaused || pendingWriteEvents_.size() > 0) {
259  pendingWriteEvents_.push_back(std::make_pair(event, callback));
260  } else {
261  CHECK_EQ(writeState_, kStateOpen);
262  writeEvents_.push_back(event);
263  callback->writeSuccess();
264  }
265 }
266 
267 void
268 TestAsyncTransport::writeChain(AsyncTransportWrapper::WriteCallback* callback,
269  std::unique_ptr<folly::IOBuf>&& iob,
270  WriteFlags flags) {
271  size_t count = iob->countChainElements();
272  iovec vec[count];
273  const IOBuf* head = iob.get();
274  const IOBuf* next = head;
275  unsigned i = 0;
276  do {
277  vec[i].iov_base = const_cast<uint8_t *>(next->data());
278  vec[i++].iov_len = next->length();
279  next = next->next();
280  } while (next != head);
281  this->writev(callback, vec, count, flags);
282 }
283 
284 void
286  closeNow();
287 }
288 
289 void
291  if (readState_ == kStateOpen) {
293 
294  if (readCallback_ != nullptr) {
296  readCallback_ = nullptr;
297  callback->readEOF();
298  }
299  }
301 }
302 
303 void
306 }
307 
308 void
310  DestructorGuard g(this);
312  if (writeState_ == kStateOpen ||
315  }
316 }
317 
318 void
320  // This isn't really accurate, but close enough for testing.
321  addr->setFromIpPort("127.0.0.1", 0);
322 }
323 
324 void
326  // This isn't really accurate, but close enough for testing.
327  addr->setFromIpPort("127.0.0.1", 0);
328 }
329 
330 bool
332  return (readState_ == kStateOpen && writesAllowed());
333 }
334 
335 bool
337  return false;
338 }
339 
340 bool
342  return false;
343 }
344 
345 bool
347  return (readState_ == kStateError || writeState_ == kStateError);
348 }
349 
350 void
352  CHECK(nullptr == eventBase_);
353  CHECK(nullptr == readCallback_);
354  eventBase_ = eventBase;
355 }
356 
357 void
359  CHECK_NOTNULL(eventBase_);
360  CHECK(nullptr == readCallback_);
361  eventBase_ = nullptr;
362 }
363 
364 bool
366  return true;
367 }
368 
369 EventBase*
371  return eventBase_;
372 }
373 
374 void
376  sendTimeout_ = milliseconds;
377 }
378 
379 uint32_t
381  return sendTimeout_;
382 }
383 
384 void
386  if (writeState_ != kStateOpen) {
387  LOG(FATAL) << "cannot pause writes on non-open transport; state=" <<
388  writeState_;
389  }
391 }
392 
393 void
395  if (writeState_ != kStatePaused) {
396  LOG(FATAL) << "cannot resume writes on non-paused transport; state=" <<
397  writeState_;
398  }
400  for (auto event = pendingWriteEvents_.begin();
401  event != pendingWriteEvents_.end() && writeState_ == kStateOpen;
402  event = pendingWriteEvents_.begin()) {
403  writeEvents_.push_back(event->first);
404  pendingWriteEvents_.pop_front();
405  event->second->writeSuccess();
406  }
407 }
408 
409 void
411  // writeError() callback might try to delete this object
412  DestructorGuard g(this);
413  while (!pendingWriteEvents_.empty()) {
414  auto event = pendingWriteEvents_.front();
415  pendingWriteEvents_.pop_front();
416  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
417  "Transport closed locally");
418  auto cb = dynamic_cast<WriteCallback*>(event.second);
419  if (cb) {
420  cb->writeErr(0, ex);
421  }
422  }
423 }
424 
425 void
427  std::chrono::milliseconds delayFromPrevious) {
428  while (true) {
429  unique_ptr<IOBuf> cur = chain.pop_front();
430  if (!cur) {
431  break;
432  }
433  addReadEvent(cur->data(), cur->length(), delayFromPrevious);
434  }
435 }
436 
437 void
438 TestAsyncTransport::addReadEvent(const void* buf, size_t buflen,
439  std::chrono::milliseconds delayFromPrevious) {
440  if (!readEvents_.empty() && readEvents_.back()->isFinalEvent()) {
441  LOG(FATAL) << "cannot add more read events after an error or EOF";
442  }
443 
444  auto event = std::make_shared<ReadEvent>(buf, buflen, delayFromPrevious);
445  addReadEvent(event);
446 }
447 
448 void
450  std::chrono::milliseconds delayFromPrevious) {
451  addReadEvent(buf, strlen(buf), delayFromPrevious);
452 }
453 
454 void
455 TestAsyncTransport::addReadEOF(std::chrono::milliseconds delayFromPrevious) {
456  addReadEvent(nullptr, 0, delayFromPrevious);
457 }
458 
459 void
461  std::chrono::milliseconds delayFromPrevious) {
462  if (!readEvents_.empty() && readEvents_.back()->isFinalEvent()) {
463  LOG(FATAL) << "cannot add a read error after an error or EOF";
464  }
465 
466  auto event = std::make_shared<ReadEvent>(ex, delayFromPrevious);
467  addReadEvent(event);
468 }
469 
470 void
471 TestAsyncTransport::addReadEvent(const shared_ptr<ReadEvent>& event) {
472  bool firstEvent = readEvents_.empty();
473  readEvents_.push_back(event);
474 
475  if (!firstEvent) {
476  return;
477  }
479  return;
480  }
481 
482  nextReadEventTime_ = prevReadEventTime_ + event->getDelay();
483  if (readCallback_ == nullptr) {
484  return;
485  }
486 
488 }
489 
490 void
492  auto now = proxygen::getCurrentTime();
494 
495  if (readEvents_.empty()) {
496  return;
497  }
498  nextReadEventTime_ = prevReadEventTime_ + readEvents_.front()->getDelay();
499 
500  if (readCallback_ == nullptr) {
501  return;
502  }
503 
505 }
506 
507 void
509  if (nextReadEventTime_ <= now) {
511  } else {
512  scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>
513  (nextReadEventTime_ - now));
514  }
515 }
516 
517 void
519  DestructorGuard dg(this);
520  CHECK(!readEvents_.empty());
521  CHECK_NOTNULL(readCallback_);
522 
523  // maxReadAtOnce prevents us from starving other users of this EventBase
524  unsigned int const maxReadAtOnce = 30;
525  for (unsigned int n = 0; n < maxReadAtOnce; ++n) {
527 
528  if (readCallback_ == nullptr || eventBase_ == nullptr ||
530  return;
531  }
532  auto now = proxygen::getCurrentTime();
533  if (nextReadEventTime_ > now) {
534  scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>
535  (nextReadEventTime_ - now));
536  return;
537  }
538  }
539 
540  // Trigger fireNextReadEvent() to be called the next time around the event
541  // loop.
543  this));
544 }
545 
546 void
548  CHECK(!readEvents_.empty());
549  CHECK_NOTNULL(readCallback_);
550 
551  const shared_ptr<ReadEvent>& event = readEvents_.front();
552 
553  // Note that we call getReadBuffer() here even if we know the next event may
554  // be an EOF or an error. This matches the behavior of AsyncSocket.
555  // (Because AsyncSocket merely gets notification that the socket is readable,
556  // and has to call getReadBuffer() before it can make the actual read call to
557  // get an error or EOF.)
558  void* buf;
559  size_t buflen;
560  try {
561  readCallback_->getReadBuffer(&buf, &buflen);
562  } catch (...) {
563  // TODO: we should convert the error to a AsyncSocketException and call
564  // readError() here.
565  LOG(FATAL) << "readCallback_->getReadBuffer() threw an error";
566  }
567  if (buf == nullptr || buflen == 0) {
568  // TODO: we should just call readError() here.
569  LOG(FATAL) << "readCallback_->getReadBuffer() returned a nullptr or "
570  "empty buffer";
571  }
572 
573  // Handle errors
574  if (event->isError()) {
575  // Errors immediately move both read and write to an error state
578 
579  // event is just a reference to the shared_ptr, so make a real copy of the
580  // pointer before popping it off the readEvents_ list.
581  shared_ptr<ReadEvent> eventPointerCopy = readEvents_.front();
582  readEvents_.pop_front();
583  CHECK(readEvents_.empty());
584  nextReadEventTime_ = {};
585 
586  auto callback = readCallback_;
587  readCallback_ = nullptr;
588  callback->readErr(eventPointerCopy->getException());
589  return;
590  }
591 
592  // Handle EOF
593  size_t available = event->getLength();
594  if (available == 0) {
596 
597  readEvents_.pop_front();
598  CHECK(readEvents_.empty());
599  nextReadEventTime_ = {};
600 
601  auto callback = readCallback_;
602  readCallback_ = nullptr;
603  callback->readEOF();
604  return;
605  }
606 
607  // Handle a normal read event
608  size_t readlen;
609  bool more;
610  if (available <= buflen) {
611  readlen = available;
612  more = false;
613  } else {
614  readlen = buflen;
615  more = true;
616  }
617  memcpy(buf, event->getBuffer(), readlen);
618  if (more) {
619  event->consumeData(readlen);
620  } else {
622  // Note: since event is just a reference to the shared_ptr in readEvents_,
623  // we shouldn't access the event any more after popping it off here.
624  readEvents_.pop_front();
625 
626  if (readEvents_.empty()) {
627  nextReadEventTime_ = {};
628  } else {
629  nextReadEventTime_ = prevReadEventTime_ + readEvents_.front()->getDelay();
630  }
631  }
633 }
634 
635 void
637  CHECK_NOTNULL(readCallback_);
638  CHECK(!readEvents_.empty());
640 }
void attachEventBase(folly::EventBase *eventBase) override
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
void shutdownWrite() override
virtual void readDataAvailable(size_t len) noexcept=0
flags
Definition: http_parser.h:127
void detachEventBase() override
folly::EventBase * getEventBase() const override
folly::EventBase * eventBase_
void addReadError(const folly::AsyncSocketException &ex, std::chrono::milliseconds delayFromPrevious)
uint32_t getSendTimeout() const override
WriteEvent(proxygen::TimePoint time, size_t count)
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds getDelay() const
std::chrono::steady_clock::time_point now()
ReadCallback * getReadCallback() const override
void writeChain(AsyncTransportWrapper::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&iob, folly::WriteFlags flags=folly::WriteFlags::NONE) override
const uint8_t * data() const
Definition: IOBuf.h:499
std::chrono::milliseconds delay_
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
ReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delay)
requires E e noexcept(noexcept(s.error(std::move(e))))
#define nullptr
Definition: http_parser.c:41
bool error() const override
string UNKNOWN
Definition: tokenize.py:53
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
bool isSet(WriteFlags a, WriteFlags b)
static std::shared_ptr< WriteEvent > newEvent(const struct iovec *vec, size_t count)
void shutdownWriteNow() override
bool readable() const override
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
void timeoutExpired() noexceptoverride
SocketAddress getPeerAddress() const
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
bool connecting() const override
std::size_t length() const
Definition: IOBuf.h:533
bool writesAllowed() const
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
Definition: Traits.h:588
proxygen::TimePoint nextReadEventTime_
IOBuf * next()
Definition: IOBuf.h:600
void free()
bool isDetachable() const override
ReadEvent(const folly::AsyncSocketException &ex, std::chrono::milliseconds delay)
void setFromIpPort(const char *ip, uint16_t port)
SocketAddress getLocalAddress() const
int * count
SteadyClock::time_point TimePoint
Definition: Time.h:25
void setReadCB(AsyncTransportWrapper::ReadCallback *callback) override
bool good() const override
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
g_t g(f_t)
proxygen::TimePoint prevReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
void addReadEOF(std::chrono::milliseconds delayFromPrevious)
std::chrono::time_point< ClockType > getCurrentTime()
Definition: Time.h:41
bool scheduleTimeout(uint32_t milliseconds)
void writev(AsyncTransportWrapper::WriteCallback *callback, const struct iovec *vec, size_t count, folly::WriteFlags flags=folly::WriteFlags::NONE) override
folly::AsyncTransportWrapper::ReadCallback * readCallback_
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
static void destroyEvent(WriteEvent *event)
void closeNow() override
void write(AsyncTransportWrapper::WriteCallback *callback, const void *buf, size_t bytes, folly::WriteFlags flags=folly::WriteFlags::NONE) override
const folly::AsyncSocketException & getException() const
virtual void readEOF() noexcept=0
bool timePointInitialized(const T &time)
Definition: Time.h:35
ThreadPoolListHook * addr
StatsClock::time_point TimePoint
TestAsyncTransport(folly::EventBase *eventBase)
std::chrono::nanoseconds time()
std::unique_ptr< folly::IOBuf > pop_front()
Definition: IOBufQueue.cpp:316
folly::AsyncSocketException exception_
def next(obj)
Definition: ast.py:58
virtual void readErr(const AsyncSocketException &ex) noexcept=0
void scheduleNextReadEvent(proxygen::TimePoint now)