21 #include <sys/types.h> 30 #include <glog/logging.h> 47 constexpr
size_t kAlign = 4096;
54 void waitUntilReadable(
int fd) {
61 r =
poll(&pfd, 1, -1);
62 }
while (r == -1 && errno == EINTR);
64 CHECK_EQ(pfd.revents, POLLIN);
70 return reader->
wait(1);
72 waitUntilReadable(fd);
81 explicit TemporaryFile(
size_t size);
84 const fs::path path()
const {
92 TemporaryFile::TemporaryFile(
size_t size)
93 : path_(fs::temp_directory_path() / fs::unique_path()) {
94 CHECK_EQ(size %
sizeof(
uint32_t), 0);
97 std::mt19937 rnd(seed);
99 const size_t bufferSize = 1U << 16;
102 FILE* fp = ::fopen(path_.c_str(),
"wb");
103 PCHECK(fp !=
nullptr);
105 size_t n =
std::min(size, bufferSize);
106 for (
size_t i = 0;
i < n; ++
i) {
109 size_t written = ::fwrite(buffer,
sizeof(
uint32_t), n, fp);
110 PCHECK(written == n);
113 PCHECK(::fclose(fp) == 0);
116 TemporaryFile::~TemporaryFile() {
119 }
catch (
const fs::filesystem_error& e) {
124 TemporaryFile tempFile(6 << 20);
126 typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
127 ManagedBuffer allocateAligned(
size_t size) {
129 int rc = posix_memalign(&buf, kAlign, size);
131 return ManagedBuffer(reinterpret_cast<char*>(buf),
free);
134 void testReadsSerially(
135 const std::vector<TestSpec>&
specs,
136 AsyncIO::PollMode pollMode) {
137 AsyncIO aioReader(1, pollMode);
139 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
145 for (
size_t i = 0;
i < specs.size();
i++) {
146 auto buf = allocateAligned(specs[
i].size);
147 op.pread(fd, buf.get(), specs[
i].size, specs[
i].start);
148 aioReader.submit(&op);
151 auto ops = readerWait(&aioReader);
155 ssize_t res = op.result();
162 void testReadsParallel(
163 const std::vector<TestSpec>& specs,
164 AsyncIO::PollMode pollMode,
165 bool multithreaded) {
166 AsyncIO aioReader(specs.size(), pollMode);
167 std::unique_ptr<AsyncIO::Op[]>
ops(
new AsyncIO::Op[specs.size()]);
168 std::vector<ManagedBuffer> bufs;
169 bufs.reserve(specs.size());
171 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
177 std::vector<std::thread>
threads;
179 threads.reserve(specs.size());
181 for (
size_t i = 0;
i < specs.size();
i++) {
182 bufs.push_back(allocateAligned(specs[
i].size));
185 ops[
i].pread(fd, bufs[
i].
get(), specs[
i].size, specs[
i].
start);
186 aioReader.submit(&ops[
i]);
188 for (
size_t i = 0;
i < specs.size();
i++) {
195 for (
auto&
t : threads) {
198 std::vector<bool> pending(specs.size(),
true);
200 size_t remaining = specs.size();
201 while (remaining != 0) {
202 EXPECT_EQ(remaining, aioReader.pending());
203 auto completed = readerWait(&aioReader);
204 size_t nrRead = completed.size();
208 for (
size_t i = 0;
i < nrRead;
i++) {
209 int id = completed[
i] - ops.get();
214 ssize_t res = ops[id].result();
219 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
222 for (
size_t i = 0;
i < pending.size();
i++) {
227 void testReadsQueued(
228 const std::vector<TestSpec>& specs,
229 AsyncIO::PollMode pollMode) {
230 size_t readerCapacity =
std::max(specs.size() / 2, size_t(1));
231 AsyncIO aioReader(readerCapacity, pollMode);
233 std::unique_ptr<AsyncIO::Op[]>
ops(
new AsyncIO::Op[specs.size()]);
234 std::vector<ManagedBuffer> bufs;
236 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
241 for (
size_t i = 0;
i < specs.size();
i++) {
242 bufs.push_back(allocateAligned(specs[
i].size));
243 ops[
i].pread(fd, bufs[
i].
get(), specs[
i].size, specs[
i].start);
244 aioQueue.submit(&ops[
i]);
246 std::vector<bool> pending(specs.size(),
true);
248 size_t remaining = specs.size();
249 while (remaining != 0) {
250 if (remaining >= readerCapacity) {
251 EXPECT_EQ(readerCapacity, aioReader.pending());
252 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
254 EXPECT_EQ(remaining, aioReader.pending());
257 auto completed = readerWait(&aioReader);
258 size_t nrRead = completed.size();
262 for (
size_t i = 0;
i < nrRead;
i++) {
263 int id = completed[
i] - ops.get();
268 ssize_t res = ops[id].result();
273 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
276 for (
size_t i = 0;
i < pending.size();
i++) {
281 void testReads(
const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
282 testReadsSerially(specs, pollMode);
283 testReadsParallel(specs, pollMode,
false);
284 testReadsParallel(specs, pollMode,
true);
285 testReadsQueued(specs, pollMode);
291 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
295 testReads({{0, 0}}, AsyncIO::POLLABLE);
299 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
300 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
304 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
305 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
310 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
311 AsyncIO::NOT_POLLABLE);
313 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
314 AsyncIO::NOT_POLLABLE);
317 {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
323 {kAlign, 2 * kAlign},
324 {kAlign, 20 * kAlign},
325 {kAlign, 1024 * 1024},
327 AsyncIO::NOT_POLLABLE);
332 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
335 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
339 {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
345 {kAlign, 2 * kAlign},
346 {kAlign, 20 * kAlign},
347 {kAlign, 1024 * 1024},
349 AsyncIO::NOT_POLLABLE);
354 std::vector<TestSpec>
v;
355 for (
int i = 0;
i < 1000;
i++) {
356 v.push_back({off_t(kAlign *
i), kAlign});
358 testReads(v, AsyncIO::NOT_POLLABLE);
364 std::vector<TestSpec>
v;
365 for (
int i = 0;
i < 1000;
i++) {
366 v.push_back({off_t(kAlign *
i), kAlign});
368 testReads(v, AsyncIO::POLLABLE);
373 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
375 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
380 size_t size = 2 * kAlign;
381 auto buf = allocateAligned(size);
382 op.pread(fd, buf.get(),
size, 0);
387 while (completed.empty()) {
389 completed = aioReader.
wait(0);
394 ssize_t res = op.result();
401 constexpr
size_t kNumOpsBatch1 = 10;
402 constexpr
size_t kNumOpsBatch2 = 10;
404 AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
405 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
411 size_t completed = 0;
413 std::vector<std::unique_ptr<AsyncIO::Op>>
ops;
414 std::vector<ManagedBuffer> bufs;
415 const auto schedule = [&](
size_t n) {
416 for (
size_t i = 0;
i < n; ++
i) {
417 const size_t size = 2 * kAlign;
418 bufs.push_back(allocateAligned(size));
420 ops.push_back(std::make_unique<AsyncIO::Op>());
421 auto& op = *ops.back();
423 op.setNotificationCallback([&](
AsyncIOOp*) { ++completed; });
424 op.pread(fd, bufs.back().get(),
size, 0);
433 schedule(kNumOpsBatch1);
437 auto result = aioReader.
wait(1);
442 schedule(kNumOpsBatch2);
446 auto canceled = aioReader.
cancel();
447 EXPECT_EQ(canceled.size(), ops.size() - result.size());
451 size_t foundCompleted = 0;
452 for (
auto& op : ops) {
453 if (op->state() == AsyncIOOp::State::COMPLETED) {
456 EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
#define EXPECT_LE(val1, val2)
std::vector< uint8_t > buffer(kBufferSize+16)
#define EXPECT_EQ(val1, val2)
Range< Op ** > pollCompleted()
fbstring exceptionStr(const std::exception &e)
TEST(AsyncIO, ZeroAsyncDataNotPollable)
#define EXPECT_GE(val1, val2)
std::vector< std::thread::id > threads
constexpr auto size(C const &c) -> decltype(c.size())
Range< Op ** > wait(size_t minRequests)
fbstring errnoStr(int err)
#define EXPECT_TRUE(condition)
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
#define EXPECT_NE(val1, val2)
#define EXPECT_FALSE(condition)
#define EXPECT_LT(val1, val2)
int close(NetworkSocket s)
GMockOutputTest ExpectedCall FILE