proxygen
AsyncIO.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <sys/eventfd.h>
20 #include <cerrno>
21 #include <ostream>
22 #include <stdexcept>
23 #include <string>
24 
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
27 
28 #include <folly/Exception.h>
29 #include <folly/Format.h>
30 #include <folly/Likely.h>
31 #include <folly/String.h>
33 
34 namespace folly {
35 
37  : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
38  memset(&iocb_, 0, sizeof(iocb_));
39 }
40 
42  CHECK_NE(state_, State::PENDING);
43  cb_ = std::move(cb);
45  result_ = -EINVAL;
46  memset(&iocb_, 0, sizeof(iocb_));
47 }
48 
50  CHECK_NE(state_, State::PENDING);
51 }
52 
54  DCHECK_EQ(state_, State::INITIALIZED);
56 }
57 
58 void AsyncIOOp::complete(ssize_t result) {
59  DCHECK_EQ(state_, State::PENDING);
61  result_ = result;
62  if (cb_) {
63  cb_(this);
64  }
65 }
66 
68  DCHECK_EQ(state_, State::PENDING);
70 }
71 
72 ssize_t AsyncIOOp::result() const {
73  CHECK_EQ(state_, State::COMPLETED);
74  return result_;
75 }
76 
77 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
78  init();
79  io_prep_pread(&iocb_, fd, buf, size, start);
80 }
81 
83  pread(fd, range.begin(), range.size(), start);
84 }
85 
86 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
87  init();
88  io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
89 }
90 
91 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
92  init();
93  io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
94 }
95 
97  pwrite(fd, range.begin(), range.size(), start);
98 }
99 
100 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
101  init();
102  io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
103 }
104 
106  CHECK_EQ(state_, State::UNINITIALIZED);
108 }
109 
110 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
111  CHECK_GT(capacity_, 0);
112  completed_.reserve(capacity_);
113  if (pollMode == POLLABLE) {
115  checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
116  }
117 }
118 
120  CHECK_EQ(pending_, 0);
121  if (ctx_) {
122  int rc = io_queue_release(ctx_);
123  CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
124  }
125  if (pollFd_ != -1) {
126  CHECK_ERR(close(pollFd_));
127  }
128 }
129 
131  auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
132  DCHECK_GE(p, 1);
133 }
134 
136  if (!ctxSet_.load(std::memory_order_acquire)) {
137  std::lock_guard<std::mutex> lock(initMutex_);
138  if (!ctxSet_.load(std::memory_order_relaxed)) {
139  int rc = io_queue_init(capacity_, &ctx_);
140  // returns negative errno
141  if (rc == -EAGAIN) {
142  long aio_nr, aio_max;
143  std::unique_ptr<FILE, int (*)(FILE*)> fp(
144  fopen("/proc/sys/fs/aio-nr", "r"), fclose);
145  PCHECK(fp);
146  CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
147 
148  std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
149  fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
150  PCHECK(aio_max_fp);
151  CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
152 
153  LOG(ERROR) << "No resources for requested capacity of " << capacity_;
154  LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
155  }
156 
157  checkKernelError(rc, "AsyncIO: io_queue_init failed");
158  DCHECK(ctx_);
159  ctxSet_.store(true, std::memory_order_release);
160  }
161  }
162 }
163 
165  CHECK_EQ(op->state(), Op::State::INITIALIZED);
166  initializeContext(); // on demand
167 
168  // We can increment past capacity, but we'll clean up after ourselves.
169  auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
170  if (p >= capacity_) {
172  throw std::range_error("AsyncIO: too many pending requests");
173  }
174  iocb* cb = &op->iocb_;
175  cb->data = nullptr; // unused
176  if (pollFd_ != -1) {
177  io_set_eventfd(cb, pollFd_);
178  }
179  int rc = io_submit(ctx_, 1, &cb);
180  if (rc < 0) {
182  throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
183  }
184  submitted_++;
185  DCHECK_EQ(rc, 1);
186  op->start();
187 }
188 
189 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
190  CHECK(ctx_);
191  CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
192  auto p = pending_.load(std::memory_order_acquire);
193  CHECK_LE(minRequests, p);
194  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
195 }
196 
198  CHECK(ctx_);
199  auto p = pending_.load(std::memory_order_acquire);
200  return doWait(WaitType::CANCEL, p, p, canceled_);
201 }
202 
204  CHECK(ctx_);
205  CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
206  uint64_t numEvents;
207  // This sets the eventFd counter to 0, see
208  // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
209  ssize_t rc;
210  do {
211  rc = ::read(pollFd_, &numEvents, 8);
212  } while (rc == -1 && errno == EINTR);
213  if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
214  return Range<Op**>(); // nothing completed
215  }
216  checkUnixError(rc, "AsyncIO: read from event fd failed");
217  DCHECK_EQ(rc, 8);
218 
219  DCHECK_GT(numEvents, 0);
220  DCHECK_LE(numEvents, pending_);
221 
222  // Don't reap more than numEvents, as we've just reset the counter to 0.
223  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
224 }
225 
227  WaitType type,
228  size_t minRequests,
229  size_t maxRequests,
230  std::vector<Op*>& result) {
231  io_event events[maxRequests];
232 
233  // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
234  // WaitType::CANCEL we have to wait for IO completion.
235  size_t count = 0;
236  do {
237  int ret;
238  do {
239  // GOTCHA: io_getevents() may returns less than min_nr results if
240  // interrupted after some events have been read (if before, -EINTR
241  // is returned).
242  ret = io_getevents(
243  ctx_,
244  minRequests - count,
245  maxRequests - count,
246  events + count,
247  /* timeout */ nullptr); // wait forever
248  } while (ret == -EINTR);
249  // Check as may not be able to recover without leaking events.
250  CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
251  << errnoStr(-ret);
252  count += ret;
253  } while (count < minRequests);
254  DCHECK_LE(count, maxRequests);
255 
256  result.clear();
257  for (size_t i = 0; i < count; ++i) {
258  DCHECK(events[i].obj);
259  Op* op = boost::intrusive::get_parent_from_member(
260  events[i].obj, &AsyncIOOp::iocb_);
262  switch (type) {
263  case WaitType::COMPLETE:
264  op->complete(events[i].res);
265  break;
266  case WaitType::CANCEL:
267  op->cancel();
268  break;
269  }
270  result.push_back(op);
271  }
272 
273  return range(result);
274 }
275 
276 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
277 
279  CHECK_EQ(asyncIO_->pending(), 0);
280 }
281 
283  submit([op]() { return op; });
284 }
285 
287  queue_.push_back(op);
288  maybeDequeue();
289 }
290 
292  maybeDequeue();
293 }
294 
296  while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
297  auto& opFactory = queue_.front();
298  auto op = opFactory();
299  queue_.pop_front();
300 
301  // Interpose our completion callback
302  auto& nextCb = op->notificationCallback();
303  op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
304  this->onCompleted(op2);
305  if (nextCb) {
306  nextCb(op2);
307  }
308  });
309 
310  asyncIO_->submit(op);
311  }
312 }
313 
314 // debugging helpers:
315 
316 namespace {
317 
318 #define X(c) \
319  case c: \
320  return #c
321 
322 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
323  switch (state) {
329  }
330  return "<INVALID AsyncIOOp::State>";
331 }
332 
333 const char* iocbCmdToString(short int cmd_short) {
334  io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
335  switch (cmd) {
336  X(IO_CMD_PREAD);
337  X(IO_CMD_PWRITE);
338  X(IO_CMD_FSYNC);
339  X(IO_CMD_FDSYNC);
340  X(IO_CMD_POLL);
341  X(IO_CMD_NOOP);
342  X(IO_CMD_PREADV);
343  X(IO_CMD_PWRITEV);
344  };
345  return "<INVALID io_iocb_cmd>";
346 }
347 
348 #undef X
349 
350 std::string fd2name(int fd) {
351  std::string path = folly::to<std::string>("/proc/self/fd/", fd);
352  char link[PATH_MAX];
353  const ssize_t length =
354  std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
355  return path.assign(link, length);
356 }
357 
358 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
359  os << folly::format(
360  "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
361  cb.data,
362  cb.key,
363  iocbCmdToString(cb.aio_lio_opcode),
364  cb.aio_reqprio,
365  cb.aio_fildes,
366  fd2name(cb.aio_fildes));
367 
368  switch (cb.aio_lio_opcode) {
369  case IO_CMD_PREAD:
370  case IO_CMD_PWRITE:
371  os << folly::format(
372  "buf={}, offset={}, nbytes={}, ",
373  cb.u.c.buf,
374  cb.u.c.offset,
375  cb.u.c.nbytes);
376  break;
377  default:
378  os << "[TODO: write debug string for "
379  << iocbCmdToString(cb.aio_lio_opcode) << "] ";
380  break;
381  }
382 
383  return os;
384 }
385 
386 } // namespace
387 
388 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
389  os << "{" << op.state_ << ", ";
390 
392  os << op.iocb_;
393  }
394 
396  os << "result=" << op.result_;
397  if (op.result_ < 0) {
398  os << " (" << errnoStr(-op.result_) << ')';
399  }
400  os << ", ";
401  }
402 
403  return os << "}";
404 }
405 
406 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
407  return os << asyncIoOpStateToString(state);
408 }
409 
410 } // namespace folly
AsyncIO * asyncIO_
Definition: AsyncIO.h:274
void complete(ssize_t result)
Definition: AsyncIO.cpp:58
void initializeContext()
Definition: AsyncIO.cpp:135
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
size_t pending() const
Definition: AsyncIO.h:173
State state() const
Definition: AsyncIO.h:80
#define eventfd(initval, flags)
void submit(AsyncIOOp *op)
Definition: AsyncIO.cpp:282
std::vector< Op * > canceled_
Definition: AsyncIO.h:234
Range< Op ** > cancel()
Definition: AsyncIO.cpp:197
void pwritev(int fd, const iovec *iov, int iovcnt, off_t start)
Definition: AsyncIO.cpp:100
PskType type
Range< Op ** > pollCompleted()
Definition: AsyncIO.cpp:203
std::atomic< size_t > submitted_
Definition: AsyncIO.h:230
std::atomic< bool > ctxSet_
Definition: AsyncIO.h:226
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::function< void(AsyncIOOp *)> NotificationCallback
Definition: AsyncIO.h:50
STL namespace.
io_context_t ctx_
Definition: AsyncIO.h:225
constexpr size_type size() const
Definition: Range.h:431
#define X(c)
Definition: AsyncIO.cpp:318
AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE)
Definition: AsyncIO.cpp:110
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::mutex initMutex_
Definition: AsyncIO.h:227
void pread(int fd, void *buf, size_t size, off_t start)
Definition: AsyncIO.cpp:77
void reset(NotificationCallback cb=NotificationCallback())
Definition: AsyncIO.cpp:41
std::deque< OpFactory > queue_
Definition: AsyncIO.h:276
void preadv(int fd, const iovec *iov, int iovcnt, off_t start)
Definition: AsyncIO.cpp:86
std::vector< Op * > completed_
Definition: AsyncIO.h:233
ssize_t result() const
Definition: AsyncIO.cpp:72
void submit(Op *op)
Definition: AsyncIO.cpp:164
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
void pwrite(int fd, const void *buf, size_t size, off_t start)
Definition: AsyncIO.cpp:91
std::function< AsyncIOOp *()> OpFactory
Definition: AsyncIO.h:267
std::unique_ptr< AsyncFizzServer::HandshakeCallback > cb_
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
void checkKernelError(ssize_t ret, Args &&...args)
Definition: Exception.h:92
AsyncIOOp(NotificationCallback cb=NotificationCallback())
Definition: AsyncIO.cpp:36
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
Range< Op ** > wait(size_t minRequests)
Definition: AsyncIO.cpp:189
void throwSystemErrorExplicit(int err, const char *msg)
Definition: Exception.h:65
fbstring errnoStr(int err)
Definition: String.cpp:463
int * count
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
cmd
Definition: gtest-cfgcmd.txt:1
constexpr Iter begin() const
Definition: Range.h:452
const size_t capacity_
Definition: AsyncIO.h:231
const char * string
Definition: Conv.cpp:212
void decrementPending()
Definition: AsyncIO.cpp:130
Formatter< false, Args... > format(StringPiece fmt, Args &&...args)
Definition: Format.h:271
void onCompleted(AsyncIOOp *op)
Definition: AsyncIO.cpp:291
#define UNLIKELY(x)
Definition: Likely.h:48
Range< AsyncIO::Op ** > doWait(WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
Definition: AsyncIO.cpp:226
NotificationCallback cb_
Definition: AsyncIO.h:113
int close(NetworkSocket s)
Definition: NetOps.cpp:90
size_t capacity() const
Definition: AsyncIO.h:181
state
Definition: http_parser.c:272
ssize_t result_
Definition: AsyncIO.h:116
AsyncIOQueue(AsyncIO *asyncIO)
Definition: AsyncIO.cpp:276
std::ostream & operator<<(std::ostream &out, dynamic const &d)
Definition: dynamic-inl.h:1158