19 #include <sys/eventfd.h> 25 #include <boost/intrusive/parent_from_member.hpp> 26 #include <glog/logging.h> 79 io_prep_pread(&
iocb_, fd, buf, size, start);
88 io_prep_preadv(&
iocb_, fd, iov, iovcnt, start);
93 io_prep_pwrite(&
iocb_, fd, const_cast<void*>(buf), size, start);
102 io_prep_pwritev(&
iocb_, fd, iov, iovcnt, start);
122 int rc = io_queue_release(
ctx_);
123 CHECK_EQ(rc, 0) <<
"io_queue_release: " <<
errnoStr(-rc);
131 auto p =
pending_.fetch_add(-1, std::memory_order_acq_rel);
136 if (!
ctxSet_.load(std::memory_order_acquire)) {
138 if (!
ctxSet_.load(std::memory_order_relaxed)) {
142 long aio_nr, aio_max;
143 std::unique_ptr<FILE, int (*)(FILE*)> fp(
144 fopen(
"/proc/sys/fs/aio-nr",
"r"), fclose);
146 CHECK_EQ(fscanf(fp.get(),
"%ld", &aio_nr), 1);
148 std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
149 fopen(
"/proc/sys/fs/aio-max-nr",
"r"), fclose);
151 CHECK_EQ(fscanf(aio_max_fp.get(),
"%ld", &aio_max), 1);
153 LOG(ERROR) <<
"No resources for requested capacity of " <<
capacity_;
154 LOG(ERROR) <<
"aio_nr " << aio_nr <<
", aio_max_nr " << aio_max;
159 ctxSet_.store(
true, std::memory_order_release);
169 auto p =
pending_.fetch_add(1, std::memory_order_acq_rel);
172 throw std::range_error(
"AsyncIO: too many pending requests");
174 iocb* cb = &op->
iocb_;
179 int rc = io_submit(
ctx_, 1, &cb);
191 CHECK_EQ(
pollFd_, -1) <<
"wait() only allowed on non-pollable object";
192 auto p =
pending_.load(std::memory_order_acquire);
193 CHECK_LE(minRequests, p);
199 auto p =
pending_.load(std::memory_order_acquire);
205 CHECK_NE(
pollFd_, -1) <<
"pollCompleted() only allowed on pollable object";
212 }
while (rc == -1 && errno == EINTR);
213 if (
UNLIKELY(rc == -1 && errno == EAGAIN)) {
219 DCHECK_GT(numEvents, 0);
230 std::vector<Op*>& result) {
231 io_event events[maxRequests];
248 }
while (ret == -EINTR);
250 CHECK_GE(ret, 0) <<
"AsyncIO: io_getevents failed with error " 253 }
while (count < minRequests);
254 DCHECK_LE(count, maxRequests);
257 for (
size_t i = 0;
i <
count; ++
i) {
258 DCHECK(events[
i].obj);
259 Op*
op = boost::intrusive::get_parent_from_member(
270 result.push_back(op);
273 return range(result);
283 submit([op]() {
return op; });
297 auto& opFactory =
queue_.front();
298 auto op = opFactory();
302 auto& nextCb =
op->notificationCallback();
303 op->setNotificationCallback([
this, nextCb](
AsyncIOOp* op2) {
330 return "<INVALID AsyncIOOp::State>";
333 const char* iocbCmdToString(
short int cmd_short) {
334 io_iocb_cmd
cmd =
static_cast<io_iocb_cmd
>(cmd_short);
345 return "<INVALID io_iocb_cmd>";
351 std::string path = folly::to<std::string>(
"/proc/self/fd/", fd);
353 const ssize_t length =
354 std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
355 return path.assign(link, length);
358 std::ostream&
operator<<(std::ostream& os,
const iocb& cb) {
360 "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
363 iocbCmdToString(cb.aio_lio_opcode),
366 fd2name(cb.aio_fildes));
368 switch (cb.aio_lio_opcode) {
372 "buf={}, offset={}, nbytes={}, ",
378 os <<
"[TODO: write debug string for " 379 << iocbCmdToString(cb.aio_lio_opcode) <<
"] ";
389 os <<
"{" << op.
state_ <<
", ";
407 return os << asyncIoOpStateToString(state);
void complete(ssize_t result)
std::atomic< size_t > pending_
#define eventfd(initval, flags)
void submit(AsyncIOOp *op)
std::vector< Op * > canceled_
void pwritev(int fd, const iovec *iov, int iovcnt, off_t start)
Range< Op ** > pollCompleted()
std::atomic< size_t > submitted_
std::atomic< bool > ctxSet_
constexpr detail::Map< Move > move
std::function< void(AsyncIOOp *)> NotificationCallback
constexpr size_type size() const
AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE)
—— Concurrent Priority Queue Implementation ——
void pread(int fd, void *buf, size_t size, off_t start)
void reset(NotificationCallback cb=NotificationCallback())
std::deque< OpFactory > queue_
void preadv(int fd, const iovec *iov, int iovcnt, off_t start)
std::vector< Op * > completed_
constexpr auto size(C const &c) -> decltype(c.size())
void pwrite(int fd, const void *buf, size_t size, off_t start)
std::function< AsyncIOOp *()> OpFactory
std::unique_ptr< AsyncFizzServer::HandshakeCallback > cb_
size_t read(T &out, folly::io::Cursor &cursor)
void checkKernelError(ssize_t ret, Args &&...args)
AsyncIOOp(NotificationCallback cb=NotificationCallback())
constexpr Range< Iter > range(Iter first, Iter last)
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
Range< Op ** > wait(size_t minRequests)
void throwSystemErrorExplicit(int err, const char *msg)
fbstring errnoStr(int err)
void checkUnixError(ssize_t ret, Args &&...args)
constexpr Iter begin() const
Formatter< false, Args... > format(StringPiece fmt, Args &&...args)
void onCompleted(AsyncIOOp *op)
Range< AsyncIO::Op ** > doWait(WaitType type, size_t minRequests, size_t maxRequests, std::vector< Op * > &result)
int close(NetworkSocket s)
AsyncIOQueue(AsyncIO *asyncIO)
std::ostream & operator<<(std::ostream &out, dynamic const &d)