24 using std::shared_ptr;
25 using std::unique_ptr;
33 ReadEvent(
const void* buf,
size_t buflen, std::chrono::milliseconds delay)
46 buffer_ =
static_cast<char*
>(malloc(buflen));
48 throw std::bad_alloc();
117 for (
size_t n = 0; n <
count_; ++n) {
118 vec_[n].iov_base =
nullptr;
123 for (
size_t n = 0; n <
count_; ++n) {
128 shared_ptr<TestAsyncTransport::WriteEvent>
131 size_t bufLen =
sizeof(
WriteEvent) + (count *
sizeof(
struct iovec));
132 void* buf = malloc(bufLen);
133 if (buf ==
nullptr) {
134 throw std::bad_alloc();
138 shared_ptr<WriteEvent> event(
new(buf)
WriteEvent(
now, count),
140 for (
size_t n = 0; n <
count; ++n) {
141 size_t len = vec[n].iov_len;
142 event->vec_[n].iov_len = len;
144 event->vec_[n].iov_base =
nullptr;
148 event->vec_[n].iov_base = malloc(len);
149 if (event->vec_[n].iov_base ==
nullptr) {
150 throw std::bad_alloc();
152 memcpy(event->vec_[n].iov_base, vec[n].iov_base, len);
160 event->~WriteEvent();
185 if (callback ==
nullptr) {
198 "setReadCB() called with socket in " 200 callback->readErr(ex);
231 const void* buf,
size_t bytes,
234 op.iov_base =
const_cast<void*
>(buf);
236 this->
writev(callback, &op, 1, flags);
243 if (
isSet(flags, WriteFlags::CORK)) {
245 }
else if (
isSet(flags, WriteFlags::EOR)) {
250 "write() called on non-open TestAsyncTransport");
263 callback->writeSuccess();
269 std::unique_ptr<folly::IOBuf>&& iob,
271 size_t count = iob->countChainElements();
273 const IOBuf* head = iob.get();
278 vec[i++].iov_len = next->
length();
280 }
while (next != head);
281 this->
writev(callback, vec, count, flags);
387 LOG(FATAL) <<
"cannot pause writes on non-open transport; state=" <<
396 LOG(FATAL) <<
"cannot resume writes on non-paused transport; state=" <<
405 event->second->writeSuccess();
417 "Transport closed locally");
427 std::chrono::milliseconds delayFromPrevious) {
429 unique_ptr<IOBuf> cur = chain.
pop_front();
439 std::chrono::milliseconds delayFromPrevious) {
441 LOG(FATAL) <<
"cannot add more read events after an error or EOF";
444 auto event = std::make_shared<ReadEvent>(buf, buflen, delayFromPrevious);
450 std::chrono::milliseconds delayFromPrevious) {
461 std::chrono::milliseconds delayFromPrevious) {
463 LOG(FATAL) <<
"cannot add a read error after an error or EOF";
466 auto event = std::make_shared<ReadEvent>(ex, delayFromPrevious);
524 unsigned int const maxReadAtOnce = 30;
525 for (
unsigned int n = 0; n < maxReadAtOnce; ++n) {
551 const shared_ptr<ReadEvent>&
event =
readEvents_.front();
565 LOG(FATAL) <<
"readCallback_->getReadBuffer() threw an error";
567 if (buf ==
nullptr || buflen == 0) {
569 LOG(FATAL) <<
"readCallback_->getReadBuffer() returned a nullptr or " 574 if (event->isError()) {
581 shared_ptr<ReadEvent> eventPointerCopy =
readEvents_.front();
588 callback->
readErr(eventPointerCopy->getException());
593 size_t available =
event->getLength();
594 if (available == 0) {
610 if (available <= buflen) {
617 memcpy(buf, event->getBuffer(), readlen);
619 event->consumeData(readlen);
void attachEventBase(folly::EventBase *eventBase) override
std::deque< std::shared_ptr< WriteEvent > > writeEvents_
void shutdownWrite() override
virtual void readDataAvailable(size_t len) noexcept=0
void detachEventBase() override
folly::EventBase * getEventBase() const override
folly::EventBase * eventBase_
void addReadError(const folly::AsyncSocketException &ex, std::chrono::milliseconds delayFromPrevious)
uint32_t getSendTimeout() const override
WriteEvent(proxygen::TimePoint time, size_t count)
void setSendTimeout(uint32_t milliseconds) override
std::chrono::milliseconds getDelay() const
std::chrono::steady_clock::time_point now()
ReadCallback * getReadCallback() const override
void writeChain(AsyncTransportWrapper::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&iob, folly::WriteFlags flags=folly::WriteFlags::NONE) override
const uint8_t * data() const
void consumeData(size_t length)
std::chrono::milliseconds delay_
bool isFinalEvent() const
—— Concurrent Priority Queue Implementation ——
ReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delay)
requires E e noexcept(noexcept(s.error(std::move(e))))
const char * getBuffer() const
bool error() const override
void addReadEvent(const void *buf, size_t buflen, std::chrono::milliseconds delayFromPrevious)
bool isSet(WriteFlags a, WriteFlags b)
static std::shared_ptr< WriteEvent > newEvent(const struct iovec *vec, size_t count)
void shutdownWriteNow() override
bool readable() const override
void runInLoop(LoopCallback *callback, bool thisIteration=false)
void timeoutExpired() noexceptoverride
SocketAddress getPeerAddress() const
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
bool connecting() const override
std::size_t length() const
bool writesAllowed() const
std::deque< std::pair< std::shared_ptr< WriteEvent >, AsyncTransportWrapper::WriteCallback * > > pendingWriteEvents_
proxygen::TimePoint nextReadEventTime_
bool isDetachable() const override
ReadEvent(const folly::AsyncSocketException &ex, std::chrono::milliseconds delay)
void setFromIpPort(const char *ip, uint16_t port)
SocketAddress getLocalAddress() const
SteadyClock::time_point TimePoint
void setReadCB(AsyncTransportWrapper::ReadCallback *callback) override
bool good() const override
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
proxygen::TimePoint prevReadEventTime_
std::deque< std::shared_ptr< ReadEvent > > readEvents_
void addReadEOF(std::chrono::milliseconds delayFromPrevious)
std::chrono::time_point< ClockType > getCurrentTime()
bool scheduleTimeout(uint32_t milliseconds)
void writev(AsyncTransportWrapper::WriteCallback *callback, const struct iovec *vec, size_t count, folly::WriteFlags flags=folly::WriteFlags::NONE) override
folly::AsyncTransportWrapper::ReadCallback * readCallback_
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
static void destroyEvent(WriteEvent *event)
void write(AsyncTransportWrapper::WriteCallback *callback, const void *buf, size_t bytes, folly::WriteFlags flags=folly::WriteFlags::NONE) override
const folly::AsyncSocketException & getException() const
virtual void readEOF() noexcept=0
bool timePointInitialized(const T &time)
ThreadPoolListHook * addr
StatsClock::time_point TimePoint
TestAsyncTransport(folly::EventBase *eventBase)
std::chrono::nanoseconds time()
std::unique_ptr< folly::IOBuf > pop_front()
folly::AsyncSocketException exception_
virtual void readErr(const AsyncSocketException &ex) noexcept=0
void scheduleNextReadEvent(proxygen::TimePoint now)