proxygen
AsyncIOTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 
23 #include <cstdio>
24 #include <cstdlib>
25 #include <memory>
26 #include <random>
27 #include <thread>
28 #include <vector>
29 
30 #include <glog/logging.h>
31 
32 #include <folly/ScopeGuard.h>
33 #include <folly/String.h>
37 
38 namespace fs = folly::fs;
39 
40 using folly::AsyncIO;
41 using folly::AsyncIOOp;
43 using folly::errnoStr;
44 
45 namespace {
46 
47 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
48 
49 struct TestSpec {
50  off_t start;
51  size_t size;
52 };
53 
54 void waitUntilReadable(int fd) {
55  pollfd pfd;
56  pfd.fd = fd;
57  pfd.events = POLLIN;
58 
59  int r;
60  do {
61  r = poll(&pfd, 1, -1); // wait forever
62  } while (r == -1 && errno == EINTR);
63  PCHECK(r == 1);
64  CHECK_EQ(pfd.revents, POLLIN); // no errors etc
65 }
66 
67 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
68  int fd = reader->pollFd();
69  if (fd == -1) {
70  return reader->wait(1);
71  } else {
72  waitUntilReadable(fd);
73  return reader->pollCompleted();
74  }
75 }
76 
77 // Temporary file that is NOT kept open but is deleted on exit.
78 // Generate random-looking but reproduceable data.
79 class TemporaryFile {
80  public:
81  explicit TemporaryFile(size_t size);
82  ~TemporaryFile();
83 
84  const fs::path path() const {
85  return path_;
86  }
87 
88  private:
89  fs::path path_;
90 };
91 
92 TemporaryFile::TemporaryFile(size_t size)
93  : path_(fs::temp_directory_path() / fs::unique_path()) {
94  CHECK_EQ(size % sizeof(uint32_t), 0);
95  size /= sizeof(uint32_t);
96  const uint32_t seed = 42;
97  std::mt19937 rnd(seed);
98 
99  const size_t bufferSize = 1U << 16;
100  uint32_t buffer[bufferSize];
101 
102  FILE* fp = ::fopen(path_.c_str(), "wb");
103  PCHECK(fp != nullptr);
104  while (size) {
105  size_t n = std::min(size, bufferSize);
106  for (size_t i = 0; i < n; ++i) {
107  buffer[i] = rnd();
108  }
109  size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
110  PCHECK(written == n);
111  size -= written;
112  }
113  PCHECK(::fclose(fp) == 0);
114 }
115 
116 TemporaryFile::~TemporaryFile() {
117  try {
118  fs::remove(path_);
119  } catch (const fs::filesystem_error& e) {
120  LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
121  }
122 }
123 
124 TemporaryFile tempFile(6 << 20); // 6MiB
125 
126 typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
127 ManagedBuffer allocateAligned(size_t size) {
128  void* buf;
129  int rc = posix_memalign(&buf, kAlign, size);
130  CHECK_EQ(rc, 0) << errnoStr(rc);
131  return ManagedBuffer(reinterpret_cast<char*>(buf), free);
132 }
133 
134 void testReadsSerially(
135  const std::vector<TestSpec>& specs,
136  AsyncIO::PollMode pollMode) {
137  AsyncIO aioReader(1, pollMode);
138  AsyncIO::Op op;
139  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
140  PCHECK(fd != -1);
141  SCOPE_EXIT {
142  ::close(fd);
143  };
144 
145  for (size_t i = 0; i < specs.size(); i++) {
146  auto buf = allocateAligned(specs[i].size);
147  op.pread(fd, buf.get(), specs[i].size, specs[i].start);
148  aioReader.submit(&op);
149  EXPECT_EQ((i + 1), aioReader.totalSubmits());
150  EXPECT_EQ(aioReader.pending(), 1);
151  auto ops = readerWait(&aioReader);
152  EXPECT_EQ(1, ops.size());
153  EXPECT_TRUE(ops[0] == &op);
154  EXPECT_EQ(aioReader.pending(), 0);
155  ssize_t res = op.result();
156  EXPECT_LE(0, res) << folly::errnoStr(-res);
157  EXPECT_EQ(specs[i].size, res);
158  op.reset();
159  }
160 }
161 
162 void testReadsParallel(
163  const std::vector<TestSpec>& specs,
164  AsyncIO::PollMode pollMode,
165  bool multithreaded) {
166  AsyncIO aioReader(specs.size(), pollMode);
167  std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
168  std::vector<ManagedBuffer> bufs;
169  bufs.reserve(specs.size());
170 
171  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
172  PCHECK(fd != -1);
173  SCOPE_EXIT {
174  ::close(fd);
175  };
176 
177  std::vector<std::thread> threads;
178  if (multithreaded) {
179  threads.reserve(specs.size());
180  }
181  for (size_t i = 0; i < specs.size(); i++) {
182  bufs.push_back(allocateAligned(specs[i].size));
183  }
184  auto submit = [&](size_t i) {
185  ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
186  aioReader.submit(&ops[i]);
187  };
188  for (size_t i = 0; i < specs.size(); i++) {
189  if (multithreaded) {
190  threads.emplace_back([&submit, i] { submit(i); });
191  } else {
192  submit(i);
193  }
194  }
195  for (auto& t : threads) {
196  t.join();
197  }
198  std::vector<bool> pending(specs.size(), true);
199 
200  size_t remaining = specs.size();
201  while (remaining != 0) {
202  EXPECT_EQ(remaining, aioReader.pending());
203  auto completed = readerWait(&aioReader);
204  size_t nrRead = completed.size();
205  EXPECT_NE(nrRead, 0);
206  remaining -= nrRead;
207 
208  for (size_t i = 0; i < nrRead; i++) {
209  int id = completed[i] - ops.get();
210  EXPECT_GE(id, 0);
211  EXPECT_LT(id, specs.size());
212  EXPECT_TRUE(pending[id]);
213  pending[id] = false;
214  ssize_t res = ops[id].result();
215  EXPECT_LE(0, res) << folly::errnoStr(-res);
216  EXPECT_EQ(specs[id].size, res);
217  }
218  }
219  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
220 
221  EXPECT_EQ(aioReader.pending(), 0);
222  for (size_t i = 0; i < pending.size(); i++) {
223  EXPECT_FALSE(pending[i]);
224  }
225 }
226 
227 void testReadsQueued(
228  const std::vector<TestSpec>& specs,
229  AsyncIO::PollMode pollMode) {
230  size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
231  AsyncIO aioReader(readerCapacity, pollMode);
232  AsyncIOQueue aioQueue(&aioReader);
233  std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
234  std::vector<ManagedBuffer> bufs;
235 
236  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
237  PCHECK(fd != -1);
238  SCOPE_EXIT {
239  ::close(fd);
240  };
241  for (size_t i = 0; i < specs.size(); i++) {
242  bufs.push_back(allocateAligned(specs[i].size));
243  ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
244  aioQueue.submit(&ops[i]);
245  }
246  std::vector<bool> pending(specs.size(), true);
247 
248  size_t remaining = specs.size();
249  while (remaining != 0) {
250  if (remaining >= readerCapacity) {
251  EXPECT_EQ(readerCapacity, aioReader.pending());
252  EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
253  } else {
254  EXPECT_EQ(remaining, aioReader.pending());
255  EXPECT_EQ(0, aioQueue.queued());
256  }
257  auto completed = readerWait(&aioReader);
258  size_t nrRead = completed.size();
259  EXPECT_NE(nrRead, 0);
260  remaining -= nrRead;
261 
262  for (size_t i = 0; i < nrRead; i++) {
263  int id = completed[i] - ops.get();
264  EXPECT_GE(id, 0);
265  EXPECT_LT(id, specs.size());
266  EXPECT_TRUE(pending[id]);
267  pending[id] = false;
268  ssize_t res = ops[id].result();
269  EXPECT_LE(0, res) << folly::errnoStr(-res);
270  EXPECT_EQ(specs[id].size, res);
271  }
272  }
273  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
274  EXPECT_EQ(aioReader.pending(), 0);
275  EXPECT_EQ(aioQueue.queued(), 0);
276  for (size_t i = 0; i < pending.size(); i++) {
277  EXPECT_FALSE(pending[i]);
278  }
279 }
280 
281 void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
282  testReadsSerially(specs, pollMode);
283  testReadsParallel(specs, pollMode, false);
284  testReadsParallel(specs, pollMode, true);
285  testReadsQueued(specs, pollMode);
286 }
287 
288 } // namespace
289 
290 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
291  testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
292 }
293 
294 TEST(AsyncIO, ZeroAsyncDataPollable) {
295  testReads({{0, 0}}, AsyncIO::POLLABLE);
296 }
297 
298 TEST(AsyncIO, SingleAsyncDataNotPollable) {
299  testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
300  testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
301 }
302 
303 TEST(AsyncIO, SingleAsyncDataPollable) {
304  testReads({{0, kAlign}}, AsyncIO::POLLABLE);
305  testReads({{0, kAlign}}, AsyncIO::POLLABLE);
306 }
307 
308 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
309  testReads(
310  {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
311  AsyncIO::NOT_POLLABLE);
312  testReads(
313  {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
314  AsyncIO::NOT_POLLABLE);
315 
316  testReads(
317  {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
318 
319  testReads(
320  {
321  {kAlign, 0},
322  {kAlign, kAlign},
323  {kAlign, 2 * kAlign},
324  {kAlign, 20 * kAlign},
325  {kAlign, 1024 * 1024},
326  },
327  AsyncIO::NOT_POLLABLE);
328 }
329 
330 TEST(AsyncIO, MultipleAsyncDataPollable) {
331  testReads(
332  {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
333  AsyncIO::POLLABLE);
334  testReads(
335  {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
336  AsyncIO::POLLABLE);
337 
338  testReads(
339  {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
340 
341  testReads(
342  {
343  {kAlign, 0},
344  {kAlign, kAlign},
345  {kAlign, 2 * kAlign},
346  {kAlign, 20 * kAlign},
347  {kAlign, 1024 * 1024},
348  },
349  AsyncIO::NOT_POLLABLE);
350 }
351 
352 TEST(AsyncIO, ManyAsyncDataNotPollable) {
353  {
354  std::vector<TestSpec> v;
355  for (int i = 0; i < 1000; i++) {
356  v.push_back({off_t(kAlign * i), kAlign});
357  }
358  testReads(v, AsyncIO::NOT_POLLABLE);
359  }
360 }
361 
362 TEST(AsyncIO, ManyAsyncDataPollable) {
363  {
364  std::vector<TestSpec> v;
365  for (int i = 0; i < 1000; i++) {
366  v.push_back({off_t(kAlign * i), kAlign});
367  }
368  testReads(v, AsyncIO::POLLABLE);
369  }
370 }
371 
372 TEST(AsyncIO, NonBlockingWait) {
373  AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
374  AsyncIO::Op op;
375  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
376  PCHECK(fd != -1);
377  SCOPE_EXIT {
378  ::close(fd);
379  };
380  size_t size = 2 * kAlign;
381  auto buf = allocateAligned(size);
382  op.pread(fd, buf.get(), size, 0);
383  aioReader.submit(&op);
384  EXPECT_EQ(aioReader.pending(), 1);
385 
386  folly::Range<AsyncIO::Op**> completed;
387  while (completed.empty()) {
388  // poll without blocking until the read request completes.
389  completed = aioReader.wait(0);
390  }
391  EXPECT_EQ(completed.size(), 1);
392 
393  EXPECT_TRUE(completed[0] == &op);
394  ssize_t res = op.result();
395  EXPECT_LE(0, res) << folly::errnoStr(-res);
396  EXPECT_EQ(size, res);
397  EXPECT_EQ(aioReader.pending(), 0);
398 }
399 
400 TEST(AsyncIO, Cancel) {
401  constexpr size_t kNumOpsBatch1 = 10;
402  constexpr size_t kNumOpsBatch2 = 10;
403 
404  AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
405  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
406  PCHECK(fd != -1);
407  SCOPE_EXIT {
408  ::close(fd);
409  };
410 
411  size_t completed = 0;
412 
413  std::vector<std::unique_ptr<AsyncIO::Op>> ops;
414  std::vector<ManagedBuffer> bufs;
415  const auto schedule = [&](size_t n) {
416  for (size_t i = 0; i < n; ++i) {
417  const size_t size = 2 * kAlign;
418  bufs.push_back(allocateAligned(size));
419 
420  ops.push_back(std::make_unique<AsyncIO::Op>());
421  auto& op = *ops.back();
422 
423  op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
424  op.pread(fd, bufs.back().get(), size, 0);
425  aioReader.submit(&op);
426  }
427  };
428 
429  // Mix completed and canceled operations for this test.
430  // In order to achieve that, schedule in two batches and do partial
431  // wait() after the first one.
432 
433  schedule(kNumOpsBatch1);
434  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
435  EXPECT_EQ(completed, 0);
436 
437  auto result = aioReader.wait(1);
438  EXPECT_GE(result.size(), 1);
439  EXPECT_EQ(completed, result.size());
440  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
441 
442  schedule(kNumOpsBatch2);
443  EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
444  EXPECT_EQ(completed, result.size());
445 
446  auto canceled = aioReader.cancel();
447  EXPECT_EQ(canceled.size(), ops.size() - result.size());
448  EXPECT_EQ(aioReader.pending(), 0);
449  EXPECT_EQ(completed, result.size());
450 
451  size_t foundCompleted = 0;
452  for (auto& op : ops) {
453  if (op->state() == AsyncIOOp::State::COMPLETED) {
454  ++foundCompleted;
455  } else {
456  EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
457  }
458  }
459  EXPECT_EQ(foundCompleted, completed);
460 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
std::vector< uint8_t > buffer(kBufferSize+16)
size_t pending() const
Definition: AsyncIO.h:173
auto v
LogLevel max
Definition: LogLevel.cpp:31
Definition: __init__.py:1
Range< Op ** > cancel()
Definition: AsyncIO.cpp:197
static const int seed
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
Range< Op ** > pollCompleted()
Definition: AsyncIO.cpp:203
fbstring exceptionStr(const std::exception &e)
TEST(AsyncIO, ZeroAsyncDataNotPollable)
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
std::vector< std::thread::id > threads
void submit(Op *op)
Definition: AsyncIO.cpp:164
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
LogLevel min
Definition: LogLevel.cpp:30
const int ops
Range< Op ** > wait(size_t minRequests)
Definition: AsyncIO.cpp:189
void free()
int pollFd() const
Definition: AsyncIO.h:198
auto start
fbstring errnoStr(int err)
Definition: String.cpp:463
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
int close(NetworkSocket s)
Definition: NetOps.cpp:90
GMockOutputTest ExpectedCall FILE