proxygen
AsyncPipe.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/FileUtil.h>
20 
21 using folly::IOBuf;
22 using folly::IOBufQueue;
23 using std::string;
24 using std::unique_ptr;
25 
26 namespace folly {
27 
29  close();
30 }
31 
33  VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_
34  << "): failed while reading: " << ex.what();
35 
36  DCHECK(readCallback_ != nullptr);
38  readCallback_ = nullptr;
39  callback->readErr(ex);
40  close();
41 }
42 
45  if (fd_ >= 0) {
46  changeHandlerFD(-1);
47 
48  if (closeCb_) {
49  closeCb_(fd_);
50  } else {
51  ::close(fd_);
52  }
53  fd_ = -1;
54  }
55 }
56 
58  DestructorGuard dg(this);
59  CHECK(events & EventHandler::READ);
60 
61  VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62  assert(readCallback_ != nullptr);
63 
64  while (readCallback_) {
65  // - What API does callback support?
66  const auto movable = readCallback_->isBufferMovable(); // noexcept
67 
68  // Get the buffer to read into.
69  void* buf = nullptr;
70  size_t buflen = 0;
71  std::unique_ptr<IOBuf> ioBuf;
72 
73  if (movable) {
75  buf = ioBuf->writableBuffer();
76  buflen = ioBuf->capacity();
77  } else {
78  try {
79  readCallback_->getReadBuffer(&buf, &buflen);
80  } catch (const std::exception& ex) {
83  string("ReadCallback::getReadBuffer() "
84  "threw exception: ") +
85  ex.what());
86  failRead(aex);
87  return;
88  } catch (...) {
91  string("ReadCallback::getReadBuffer() "
92  "threw non-exception type"));
93  failRead(aex);
94  return;
95  }
96  if (buf == nullptr || buflen == 0) {
99  string("ReadCallback::getReadBuffer() "
100  "returned empty buffer"));
101  failRead(aex);
102  return;
103  }
104  }
105 
106  // Perform the read
107  ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
108 
109  if (bytesRead > 0) {
110  if (movable) {
111  ioBuf->append(std::size_t(bytesRead));
113  } else {
114  readCallback_->readDataAvailable(size_t(bytesRead));
115  }
116  // Fall through and continue around the loop if the read
117  // completely filled the available buffer.
118  // Note that readCallback_ may have been uninstalled or changed inside
119  // readDataAvailable().
120  if (static_cast<size_t>(bytesRead) < buflen) {
121  return;
122  }
123  } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
124  // No more data to read right now.
125  return;
126  } else if (bytesRead < 0) {
128  AsyncSocketException::INVALID_STATE, "read failed", errno);
129  failRead(ex);
130  return;
131  } else {
132  assert(bytesRead == 0);
133  // EOF
134 
137  readCallback_ = nullptr;
138  callback->readEOF();
139  return;
140  }
141  // Max reads per loop?
142  }
143 }
144 
146  unique_ptr<folly::IOBuf> buf,
147  AsyncWriter::WriteCallback* callback) {
148  if (closed()) {
149  if (callback) {
151  AsyncSocketException::NOT_OPEN, "attempt to write to closed pipe");
152  callback->writeErr(0, ex);
153  }
154  return;
155  }
156  bool wasEmpty = (queue_.empty());
157  folly::IOBufQueue iobq;
158  iobq.append(std::move(buf));
159  std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
160  std::move(iobq), callback);
161  queue_.emplace_back(std::move(p));
162  if (wasEmpty) {
163  handleWrite();
164  } else {
165  CHECK(!queue_.empty());
166  CHECK(isHandlerRegistered());
167  }
168 }
169 
172  std::unique_ptr<folly::IOBuf>&& buf,
173  WriteFlags) {
174  write(std::move(buf), callback);
175 }
176 
178  VLOG(5) << "close on empty";
179  if (queue_.empty()) {
180  closeNow();
181  } else {
182  closeOnEmpty_ = true;
183  CHECK(isHandlerRegistered());
184  }
185 }
186 
188  VLOG(5) << "close now";
189  if (!queue_.empty()) {
190  failAllWrites(AsyncSocketException(
191  AsyncSocketException::NOT_OPEN, "closed with pending writes"));
192  }
193  if (fd_ >= 0) {
195  changeHandlerFD(-1);
196  if (closeCb_) {
197  closeCb_(fd_);
198  } else {
199  close(fd_);
200  }
201  fd_ = -1;
202  }
203 }
204 
206  DestructorGuard dg(this);
207  while (!queue_.empty()) {
208  // the first entry of the queue could have had a partial write, but needs to
209  // be tracked.
210  if (queue_.front().second) {
211  queue_.front().second->writeErr(0, ex);
212  }
213  queue_.pop_front();
214  }
215 }
216 
218  CHECK(events & EventHandler::WRITE);
219 
220  handleWrite();
221 }
222 
224  DestructorGuard dg(this);
225  assert(!queue_.empty());
226  do {
227  auto& front = queue_.front();
228  folly::IOBufQueue& curQueue = front.first;
229  DCHECK(!curQueue.empty());
230  // someday, support writev. The logic for partial writes is a bit complex
231  const IOBuf* head = curQueue.front();
232  CHECK(head->length());
233  ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
234  if (rc < 0) {
235  if (errno == EAGAIN || errno == EWOULDBLOCK) {
236  // pipe is full
237  VLOG(5) << "write blocked";
239  return;
240  } else {
241  failAllWrites(AsyncSocketException(
242  AsyncSocketException::INTERNAL_ERROR, "write failed", errno));
243  closeNow();
244  return;
245  }
246  } else if (rc == 0) {
248  return;
249  }
250  curQueue.trimStart(size_t(rc));
251  if (curQueue.empty()) {
252  auto cb = front.second;
253  queue_.pop_front();
254  if (cb) {
255  cb->writeSuccess();
256  }
257  } else {
258  VLOG(5) << "partial write blocked";
259  }
260  } while (!queue_.empty());
261 
262  if (closeOnEmpty_) {
263  closeNow();
264  } else {
266  }
267 }
268 
269 } // namespace folly
virtual void readBufferAvailable(std::unique_ptr< IOBuf >) noexcept
const folly::IOBuf * front() const
Definition: IOBufQueue.h:476
virtual void readDataAvailable(size_t len) noexcept=0
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
~AsyncPipeReader() override
Definition: AsyncPipe.cpp:28
bool empty() const
Definition: IOBufQueue.h:503
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ssize_t readNoInt(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:102
virtual bool isBufferMovable() noexcept
void writeChain(folly::AsyncWriter::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, WriteFlags flags=WriteFlags::NONE) override
Definition: AsyncPipe.cpp:170
void handlerReady(uint16_t events) noexceptoverride
Definition: AsyncPipe.cpp:217
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
std::size_t capacity() const
Definition: IOBuf.h:593
void failAllWrites(const AsyncSocketException &ex)
Definition: AsyncPipe.cpp:205
AsyncReader::ReadCallback * readCallback_
Definition: AsyncPipe.h:88
uint8_t * writableBuffer()
Definition: IOBuf.h:572
void handlerReady(uint16_t events) noexceptoverride
Definition: AsyncPipe.cpp:57
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
ssize_t writeNoInt(int fd, const void *buf, size_t count)
Definition: FileUtil.cpp:114
virtual size_t maxBufferSize() const
void write(std::unique_ptr< folly::IOBuf > iob, AsyncWriter::WriteCallback *wcb=nullptr)
Definition: AsyncPipe.cpp:145
void changeHandlerFD(int fd)
Definition: EventHandler.h:143
const char * string
Definition: Conv.cpp:212
std::function< void(int)> closeCb_
Definition: AsyncPipe.h:89
void failRead(const AsyncSocketException &ex)
Definition: AsyncPipe.cpp:32
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
virtual void writeErr(size_t bytesWritten, const AsyncSocketException &ex) noexcept=0
virtual void readEOF() noexcept=0
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
void append(std::size_t amount)
Definition: IOBuf.h:689
bool isHandlerRegistered() const
Definition: EventHandler.h:112
virtual void readErr(const AsyncSocketException &ex) noexcept=0