24 using std::unique_ptr;
33 VLOG(5) <<
"AsyncPipeReader(this=" <<
this <<
", fd=" <<
fd_ 34 <<
"): failed while reading: " << ex.what();
61 VLOG(5) <<
"AsyncPipeReader::handlerReady() this=" <<
this <<
", fd=" <<
fd_;
71 std::unique_ptr<IOBuf> ioBuf;
80 }
catch (
const std::exception& ex) {
83 string(
"ReadCallback::getReadBuffer() " 84 "threw exception: ") +
91 string(
"ReadCallback::getReadBuffer() " 92 "threw non-exception type"));
96 if (buf ==
nullptr || buflen == 0) {
99 string(
"ReadCallback::getReadBuffer() " 100 "returned empty buffer"));
111 ioBuf->
append(std::size_t(bytesRead));
120 if (static_cast<size_t>(bytesRead) < buflen) {
123 }
else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
126 }
else if (bytesRead < 0) {
132 assert(bytesRead == 0);
146 unique_ptr<folly::IOBuf> buf,
156 bool wasEmpty = (queue_.empty());
159 std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
165 CHECK(!queue_.empty());
172 std::unique_ptr<folly::IOBuf>&& buf,
178 VLOG(5) <<
"close on empty";
179 if (queue_.empty()) {
182 closeOnEmpty_ =
true;
188 VLOG(5) <<
"close now";
189 if (!queue_.empty()) {
207 while (!queue_.empty()) {
210 if (queue_.front().second) {
211 queue_.front().second->writeErr(0, ex);
225 assert(!queue_.empty());
227 auto& front = queue_.front();
229 DCHECK(!curQueue.
empty());
232 CHECK(head->length());
235 if (errno == EAGAIN || errno == EWOULDBLOCK) {
237 VLOG(5) <<
"write blocked";
246 }
else if (rc == 0) {
251 if (curQueue.
empty()) {
252 auto cb = front.second;
258 VLOG(5) <<
"partial write blocked";
260 }
while (!queue_.empty());
virtual void readBufferAvailable(std::unique_ptr< IOBuf >) noexcept
const folly::IOBuf * front() const
virtual void readDataAvailable(size_t len) noexcept=0
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
void write(const T &in, folly::io::Appender &appender)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
~AsyncPipeReader() override
constexpr detail::Map< Move > move
ssize_t readNoInt(int fd, void *buf, size_t count)
virtual bool isBufferMovable() noexcept
void writeChain(folly::AsyncWriter::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
void handlerReady(uint16_t events) noexceptoverride
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
std::size_t capacity() const
void failAllWrites(const AsyncSocketException &ex)
AsyncReader::ReadCallback * readCallback_
uint8_t * writableBuffer()
void handlerReady(uint16_t events) noexceptoverride
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
ssize_t writeNoInt(int fd, const void *buf, size_t count)
virtual size_t maxBufferSize() const
void write(std::unique_ptr< folly::IOBuf > iob, AsyncWriter::WriteCallback *wcb=nullptr)
void changeHandlerFD(int fd)
std::function< void(int)> closeCb_
void failRead(const AsyncSocketException &ex)
void trimStart(size_t amount)
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
virtual void readEOF() noexcept=0
bool registerHandler(uint16_t events)
void append(std::size_t amount)
bool isHandlerRegistered() const
virtual void readErr(const AsyncSocketException &ex) noexcept=0