proxygen
folly::AsyncIO Class Reference

#include <AsyncIO.h>

Inheritance diagram for folly::AsyncIO:

Public Types

enum  PollMode { NOT_POLLABLE, POLLABLE }
 
typedef AsyncIOOp Op
 

Public Member Functions

 AsyncIO (size_t capacity, PollMode pollMode=NOT_POLLABLE)
 
 ~AsyncIO ()
 
Range< Op ** > wait (size_t minRequests)
 
Range< Op ** > cancel ()
 
size_t pending () const
 
size_t capacity () const
 
size_t totalSubmits () const
 
int pollFd () const
 
Range< Op ** > pollCompleted ()
 
void submit (Op *op)
 

Private Types

enum  WaitType { WaitType::COMPLETE, WaitType::CANCEL }
 

Private Member Functions

void decrementPending ()
 
void initializeContext ()
 
Range< AsyncIO::Op ** > doWait (WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
 

Private Attributes

io_context_t ctx_ {nullptr}
 
std::atomic< bool > ctxSet_ {false}
 
std::mutex initMutex_
 
std::atomic< size_t > pending_ {0}
 
std::atomic< size_t > submitted_ {0}
 
const size_t capacity_
 
int pollFd_ {-1}
 
std::vector< Op * > completed_
 
std::vector< Op * > canceled_
 

Detailed Description

C++ interface around Linux Async IO.

Definition at line 125 of file AsyncIO.h.

Member Typedef Documentation

Definition at line 127 of file AsyncIO.h.

Member Enumeration Documentation

Enumerator
NOT_POLLABLE 
POLLABLE 

Definition at line 129 of file AsyncIO.h.

enum folly::AsyncIO::WaitType
strongprivate
Enumerator
COMPLETE 
CANCEL 

Definition at line 218 of file AsyncIO.h.

218 { COMPLETE, CANCEL };

Constructor & Destructor Documentation

folly::AsyncIO::AsyncIO ( size_t  capacity,
PollMode  pollMode = NOT_POLLABLE 
)
explicit

Create an AsyncIO context capable of holding at most 'capacity' pending requests at the same time. As requests complete, others can be scheduled, as long as this limit is not exceeded.

Note: the maximum number of allowed concurrent requests is controlled by the fs.aio-max-nr sysctl, the default value is usually 64K.

If pollMode is POLLABLE, pollFd() will return a file descriptor that can be passed to poll / epoll / select and will become readable when any IOs on this AsyncIO have completed. If you do this, you must use pollCompleted() instead of wait() – do not read from the pollFd() file descriptor directly.

You may use the same AsyncIO object from multiple threads, as long as there is only one concurrent caller of wait() / pollCompleted() / cancel() (perhaps by always calling it from the same thread, or by providing appropriate mutual exclusion). In this case, pending() returns a snapshot of the current number of pending requests.

Definition at line 110 of file AsyncIO.cpp.

References capacity_, folly::checkUnixError(), completed_, EFD_NONBLOCK, eventfd, POLLABLE, and pollFd_.

110  : capacity_(capacity) {
111  CHECK_GT(capacity_, 0);
112  completed_.reserve(capacity_);
113  if (pollMode == POLLABLE) {
115  checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
116  }
117 }
#define eventfd(initval, flags)
std::vector< Op * > completed_
Definition: AsyncIO.h:233
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
const size_t capacity_
Definition: AsyncIO.h:231
size_t capacity() const
Definition: AsyncIO.h:181
folly::AsyncIO::~AsyncIO ( )

Definition at line 119 of file AsyncIO.cpp.

References folly::netops::close(), ctx_, folly::errnoStr(), pending_, and pollFd_.

119  {
120  CHECK_EQ(pending_, 0);
121  if (ctx_) {
122  int rc = io_queue_release(ctx_);
123  CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
124  }
125  if (pollFd_ != -1) {
126  CHECK_ERR(close(pollFd_));
127  }
128 }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
io_context_t ctx_
Definition: AsyncIO.h:225
fbstring errnoStr(int err)
Definition: String.cpp:463
int close(NetworkSocket s)
Definition: NetOps.cpp:90

Member Function Documentation

Range< AsyncIO::Op ** > folly::AsyncIO::cancel ( )

Cancel all pending requests and return them; the returned range is valid until the next call to cancel().

Definition at line 197 of file AsyncIO.cpp.

References CANCEL, canceled_, ctx_, doWait(), and pending_.

Referenced by TEST().

197  {
198  CHECK(ctx_);
199  auto p = pending_.load(std::memory_order_acquire);
200  return doWait(WaitType::CANCEL, p, p, canceled_);
201 }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
std::vector< Op * > canceled_
Definition: AsyncIO.h:234
io_context_t ctx_
Definition: AsyncIO.h:225
Range< AsyncIO::Op ** > doWait(WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
Definition: AsyncIO.cpp:226
size_t folly::AsyncIO::capacity ( ) const
inline

Return the maximum number of requests that can be kept outstanding at any one time.

Definition at line 181 of file AsyncIO.h.

Referenced by folly::AsyncIOQueue::maybeDequeue().

181  {
182  return capacity_;
183  }
const size_t capacity_
Definition: AsyncIO.h:231
void folly::AsyncIO::decrementPending ( )
private

Definition at line 130 of file AsyncIO.cpp.

References pending_.

Referenced by doWait(), and submit().

130  {
131  auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
132  DCHECK_GE(p, 1);
133 }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
Range< AsyncIO::Op ** > folly::AsyncIO::doWait ( WaitType  type,
size_t  minRequests,
size_t  maxRequests,
std::vector< Op * > &  result 
)
private

Definition at line 226 of file AsyncIO.cpp.

References folly::AsyncIOOp::cancel(), CANCEL, folly::AsyncIOOp::complete(), COMPLETE, count, ctx_, decrementPending(), folly::errnoStr(), i, folly::AsyncIOOp::iocb_, and folly::range().

Referenced by cancel(), pollCompleted(), and wait().

230  {
231  io_event events[maxRequests];
232 
233  // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
234  // WaitType::CANCEL we have to wait for IO completion.
235  size_t count = 0;
236  do {
237  int ret;
238  do {
239  // GOTCHA: io_getevents() may returns less than min_nr results if
240  // interrupted after some events have been read (if before, -EINTR
241  // is returned).
242  ret = io_getevents(
243  ctx_,
244  minRequests - count,
245  maxRequests - count,
246  events + count,
247  /* timeout */ nullptr); // wait forever
248  } while (ret == -EINTR);
249  // Check as may not be able to recover without leaking events.
250  CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
251  << errnoStr(-ret);
252  count += ret;
253  } while (count < minRequests);
254  DCHECK_LE(count, maxRequests);
255 
256  result.clear();
257  for (size_t i = 0; i < count; ++i) {
258  DCHECK(events[i].obj);
259  Op* op = boost::intrusive::get_parent_from_member(
260  events[i].obj, &AsyncIOOp::iocb_);
262  switch (type) {
263  case WaitType::COMPLETE:
264  op->complete(events[i].res);
265  break;
266  case WaitType::CANCEL:
267  op->cancel();
268  break;
269  }
270  result.push_back(op);
271  }
272 
273  return range(result);
274 }
AsyncIOOp Op
Definition: AsyncIO.h:127
PskType type
io_context_t ctx_
Definition: AsyncIO.h:225
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
fbstring errnoStr(int err)
Definition: String.cpp:463
int * count
void decrementPending()
Definition: AsyncIO.cpp:130
void folly::AsyncIO::initializeContext ( )
private

Definition at line 135 of file AsyncIO.cpp.

References capacity_, folly::checkKernelError(), ctx_, ctxSet_, initMutex_, and folly::lock().

Referenced by submit().

135  {
136  if (!ctxSet_.load(std::memory_order_acquire)) {
137  std::lock_guard<std::mutex> lock(initMutex_);
138  if (!ctxSet_.load(std::memory_order_relaxed)) {
139  int rc = io_queue_init(capacity_, &ctx_);
140  // returns negative errno
141  if (rc == -EAGAIN) {
142  long aio_nr, aio_max;
143  std::unique_ptr<FILE, int (*)(FILE*)> fp(
144  fopen("/proc/sys/fs/aio-nr", "r"), fclose);
145  PCHECK(fp);
146  CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
147 
148  std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
149  fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
150  PCHECK(aio_max_fp);
151  CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
152 
153  LOG(ERROR) << "No resources for requested capacity of " << capacity_;
154  LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
155  }
156 
157  checkKernelError(rc, "AsyncIO: io_queue_init failed");
158  DCHECK(ctx_);
159  ctxSet_.store(true, std::memory_order_release);
160  }
161  }
162 }
std::atomic< bool > ctxSet_
Definition: AsyncIO.h:226
io_context_t ctx_
Definition: AsyncIO.h:225
std::mutex initMutex_
Definition: AsyncIO.h:227
void checkKernelError(ssize_t ret, Args &&...args)
Definition: Exception.h:92
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
const size_t capacity_
Definition: AsyncIO.h:231
size_t folly::AsyncIO::pending ( ) const
inline

Return the number of pending requests.

Definition at line 173 of file AsyncIO.h.

Referenced by folly::AsyncIOQueue::maybeDequeue(), TEST(), and folly::AsyncIOQueue::~AsyncIOQueue().

173  {
174  return pending_;
175  }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
Range< AsyncIO::Op ** > folly::AsyncIO::pollCompleted ( )

If POLLABLE, call instead of wait after the file descriptor returned by pollFd() became readable. The returned range is valid until the next call to pollCompleted().

Definition at line 203 of file AsyncIO.cpp.

References folly::checkUnixError(), COMPLETE, completed_, ctx_, doWait(), pending_, pollFd_, fizz::detail::read(), uint64_t, and UNLIKELY.

203  {
204  CHECK(ctx_);
205  CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
206  uint64_t numEvents;
207  // This sets the eventFd counter to 0, see
208  // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
209  ssize_t rc;
210  do {
211  rc = ::read(pollFd_, &numEvents, 8);
212  } while (rc == -1 && errno == EINTR);
213  if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
214  return Range<Op**>(); // nothing completed
215  }
216  checkUnixError(rc, "AsyncIO: read from event fd failed");
217  DCHECK_EQ(rc, 8);
218 
219  DCHECK_GT(numEvents, 0);
220  DCHECK_LE(numEvents, pending_);
221 
222  // Don't reap more than numEvents, as we've just reset the counter to 0.
223  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
224 }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
io_context_t ctx_
Definition: AsyncIO.h:225
std::vector< Op * > completed_
Definition: AsyncIO.h:233
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
#define UNLIKELY(x)
Definition: Likely.h:48
Range< AsyncIO::Op ** > doWait(WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
Definition: AsyncIO.cpp:226
int folly::AsyncIO::pollFd ( ) const
inline

If POLLABLE, return a file descriptor that can be passed to poll / epoll and will become readable when any async IO operations have completed. If NOT_POLLABLE, return -1.

Definition at line 198 of file AsyncIO.h.

References submit.

198  {
199  return pollFd_;
200  }
void folly::AsyncIO::submit ( Op op)

Submit an op for execution.

Definition at line 164 of file AsyncIO.cpp.

References capacity_, ctx_, decrementPending(), initializeContext(), folly::AsyncIOOp::INITIALIZED, folly::AsyncIOOp::iocb_, pending_, pollFd_, folly::AsyncIOOp::start(), folly::AsyncIOOp::state(), submitted_, and folly::throwSystemErrorExplicit().

Referenced by folly::AsyncIOQueue::maybeDequeue(), and TEST().

164  {
165  CHECK_EQ(op->state(), Op::State::INITIALIZED);
166  initializeContext(); // on demand
167 
168  // We can increment past capacity, but we'll clean up after ourselves.
169  auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
170  if (p >= capacity_) {
172  throw std::range_error("AsyncIO: too many pending requests");
173  }
174  iocb* cb = &op->iocb_;
175  cb->data = nullptr; // unused
176  if (pollFd_ != -1) {
177  io_set_eventfd(cb, pollFd_);
178  }
179  int rc = io_submit(ctx_, 1, &cb);
180  if (rc < 0) {
182  throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
183  }
184  submitted_++;
185  DCHECK_EQ(rc, 1);
186  op->start();
187 }
void initializeContext()
Definition: AsyncIO.cpp:135
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
std::atomic< size_t > submitted_
Definition: AsyncIO.h:230
io_context_t ctx_
Definition: AsyncIO.h:225
void throwSystemErrorExplicit(int err, const char *msg)
Definition: Exception.h:65
const size_t capacity_
Definition: AsyncIO.h:231
void decrementPending()
Definition: AsyncIO.cpp:130
size_t folly::AsyncIO::totalSubmits ( ) const
inline

Return the accumulative number of submitted I/O, since this object has been created.

Definition at line 189 of file AsyncIO.h.

189  {
190  return submitted_;
191  }
std::atomic< size_t > submitted_
Definition: AsyncIO.h:230
Range< AsyncIO::Op ** > folly::AsyncIO::wait ( size_t  minRequests)

Wait for at least minRequests to complete. Returns the requests that have completed; the returned range is valid until the next call to wait(). minRequests may be 0 to not block.

Definition at line 189 of file AsyncIO.cpp.

References COMPLETE, completed_, ctx_, doWait(), pending_, and pollFd_.

Referenced by TEST().

189  {
190  CHECK(ctx_);
191  CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
192  auto p = pending_.load(std::memory_order_acquire);
193  CHECK_LE(minRequests, p);
194  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
195 }
std::atomic< size_t > pending_
Definition: AsyncIO.h:229
io_context_t ctx_
Definition: AsyncIO.h:225
std::vector< Op * > completed_
Definition: AsyncIO.h:233
Range< AsyncIO::Op ** > doWait(WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
Definition: AsyncIO.cpp:226

Member Data Documentation

std::vector<Op*> folly::AsyncIO::canceled_
private

Definition at line 234 of file AsyncIO.h.

Referenced by cancel().

const size_t folly::AsyncIO::capacity_
private

Definition at line 231 of file AsyncIO.h.

Referenced by AsyncIO(), initializeContext(), and submit().

std::vector<Op*> folly::AsyncIO::completed_
private

Definition at line 233 of file AsyncIO.h.

Referenced by AsyncIO(), pollCompleted(), and wait().

io_context_t folly::AsyncIO::ctx_ {nullptr}
private

Definition at line 225 of file AsyncIO.h.

Referenced by cancel(), doWait(), initializeContext(), pollCompleted(), submit(), wait(), and ~AsyncIO().

std::atomic<bool> folly::AsyncIO::ctxSet_ {false}
private

Definition at line 226 of file AsyncIO.h.

Referenced by initializeContext().

std::mutex folly::AsyncIO::initMutex_
private

Definition at line 227 of file AsyncIO.h.

Referenced by initializeContext().

std::atomic<size_t> folly::AsyncIO::pending_ {0}
private

Definition at line 229 of file AsyncIO.h.

Referenced by cancel(), decrementPending(), pollCompleted(), submit(), wait(), and ~AsyncIO().

int folly::AsyncIO::pollFd_ {-1}
private

Definition at line 232 of file AsyncIO.h.

Referenced by AsyncIO(), pollCompleted(), submit(), wait(), and ~AsyncIO().

std::atomic<size_t> folly::AsyncIO::submitted_ {0}
private

Definition at line 230 of file AsyncIO.h.

Referenced by submit().


The documentation for this class was generated from the following files: