proxygen
AsyncIO.h
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sys/types.h>
20 
21 #include <atomic>
22 #include <cstdint>
23 #include <deque>
24 #include <functional>
25 #include <iosfwd>
26 #include <mutex>
27 #include <utility>
28 #include <vector>
29 
30 #include <boost/noncopyable.hpp>
31 #include <libaio.h>
32 
33 #include <folly/Portability.h>
34 #include <folly/Range.h>
36 
37 namespace folly {
38 
45 class AsyncIOOp : private boost::noncopyable {
46  friend class AsyncIO;
47  friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
48 
49  public:
50  typedef std::function<void(AsyncIOOp*)> NotificationCallback;
51 
52  explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
53  ~AsyncIOOp();
54 
55  enum class State {
58  PENDING,
59  COMPLETED,
60  CANCELED,
61  };
62 
66  void pread(int fd, void* buf, size_t size, off_t start);
67  void pread(int fd, Range<unsigned char*> range, off_t start);
68  void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
69 
73  void pwrite(int fd, const void* buf, size_t size, off_t start);
74  void pwrite(int fd, Range<const unsigned char*> range, off_t start);
75  void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
76 
80  State state() const {
81  return state_;
82  }
83 
88  void reset(NotificationCallback cb = NotificationCallback());
89 
90  void setNotificationCallback(NotificationCallback cb) {
91  cb_ = std::move(cb);
92  }
93  const NotificationCallback& notificationCallback() const {
94  return cb_;
95  }
96 
105  ssize_t result() const;
106 
107  private:
108  void init();
109  void start();
110  void complete(ssize_t result);
111  void cancel();
112 
113  NotificationCallback cb_;
114  iocb iocb_;
116  ssize_t result_;
117 };
118 
119 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
120 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
121 
125 class AsyncIO : private boost::noncopyable {
126  public:
127  typedef AsyncIOOp Op;
128 
129  enum PollMode {
132  };
133 
154  explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
155  ~AsyncIO();
156 
162  Range<Op**> wait(size_t minRequests);
163 
169 
173  size_t pending() const {
174  return pending_;
175  }
176 
181  size_t capacity() const {
182  return capacity_;
183  }
184 
189  size_t totalSubmits() const {
190  return submitted_;
191  }
192 
198  int pollFd() const {
199  return pollFd_;
200  }
201 
207  Range<Op**> pollCompleted();
208 
212  void submit(Op* op);
213 
214  private:
215  void decrementPending();
216  void initializeContext();
217 
218  enum class WaitType { COMPLETE, CANCEL };
219  Range<AsyncIO::Op**> doWait(
220  WaitType type,
221  size_t minRequests,
222  size_t maxRequests,
223  std::vector<Op*>& result);
224 
225  io_context_t ctx_{nullptr};
226  std::atomic<bool> ctxSet_{false};
228 
229  std::atomic<size_t> pending_{0};
230  std::atomic<size_t> submitted_{0};
231  const size_t capacity_;
232  int pollFd_{-1};
233  std::vector<Op*> completed_;
234  std::vector<Op*> canceled_;
235 };
236 
243  public:
249  explicit AsyncIOQueue(AsyncIO* asyncIO);
250  ~AsyncIOQueue();
251 
252  size_t queued() const {
253  return queue_.size();
254  }
255 
260  void submit(AsyncIOOp* op);
261 
267  typedef std::function<AsyncIOOp*()> OpFactory;
268  void submit(OpFactory op);
269 
270  private:
271  void onCompleted(AsyncIOOp* op);
272  void maybeDequeue();
273 
275 
276  std::deque<OpFactory> queue_;
277 };
278 
279 } // namespace folly
AsyncIO * asyncIO_
Definition: AsyncIO.h:274
AsyncIOOp Op
Definition: AsyncIO.h:127
friend std::ostream & operator<<(std::ostream &stream, const AsyncIOOp &o)
Definition: AsyncIO.cpp:388
void complete(ssize_t result)
Definition: AsyncIO.cpp:58
size_t pending() const
Definition: AsyncIO.h:173
State state() const
Definition: AsyncIO.h:80
const NotificationCallback & notificationCallback() const
Definition: AsyncIO.h:93
std::vector< Op * > canceled_
Definition: AsyncIO.h:234
void pwritev(int fd, const iovec *iov, int iovcnt, off_t start)
Definition: AsyncIO.cpp:100
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::function< void(AsyncIOOp *)> NotificationCallback
Definition: AsyncIO.h:50
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
friend class AsyncIO
Definition: AsyncIO.h:46
std::shared_ptr< FizzServerContext > ctx_
std::mutex initMutex_
Definition: AsyncIO.h:227
void pread(int fd, void *buf, size_t size, off_t start)
Definition: AsyncIO.cpp:77
void reset(NotificationCallback cb=NotificationCallback())
Definition: AsyncIO.cpp:41
std::deque< OpFactory > queue_
Definition: AsyncIO.h:276
void preadv(int fd, const iovec *iov, int iovcnt, off_t start)
Definition: AsyncIO.cpp:86
std::vector< Op * > completed_
Definition: AsyncIO.h:233
ssize_t result() const
Definition: AsyncIO.cpp:72
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
void pwrite(int fd, const void *buf, size_t size, off_t start)
Definition: AsyncIO.cpp:91
std::function< AsyncIOOp *()> OpFactory
Definition: AsyncIO.h:267
AsyncIOOp(NotificationCallback cb=NotificationCallback())
Definition: AsyncIO.cpp:36
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
size_t totalSubmits() const
Definition: AsyncIO.h:189
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
int pollFd() const
Definition: AsyncIO.h:198
size_t queued() const
Definition: AsyncIO.h:252
const size_t capacity_
Definition: AsyncIO.h:231
std::mutex mutex
void setNotificationCallback(NotificationCallback cb)
Definition: AsyncIO.h:90
NotificationCallback cb_
Definition: AsyncIO.h:113
size_t capacity() const
Definition: AsyncIO.h:181
state
Definition: http_parser.c:272
ssize_t result_
Definition: AsyncIO.h:116