proxygen
AsyncFileWriterTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <thread>
17 
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/Synchronized.h>
25 #include <folly/futures/Future.h>
26 #include <folly/futures/Promise.h>
27 #include <folly/init/Init.h>
28 #include <folly/lang/SafeAssert.h>
30 #include <folly/logging/Init.h>
31 #include <folly/logging/LoggerDB.h>
32 #include <folly/logging/xlog.h>
38 #include <folly/system/ThreadId.h>
40 #include <folly/test/TestUtils.h>
41 
43  async_discard_num_normal_writers,
44  30,
45  "number of threads to use to generate normal log messages during "
46  "the AsyncFileWriter.discard test");
48  async_discard_num_nodiscard_writers,
49  2,
50  "number of threads to use to generate non-discardable log messages during "
51  "the AsyncFileWriter.discard test");
53  async_discard_read_sleep_usec,
54  500,
55  "how long the read thread should sleep between reads in "
56  "the AsyncFileWriter.discard test");
58  async_discard_timeout_msec,
59  10000,
60  "A timeout for the AsyncFileWriter.discard test if it cannot generate "
61  "enough discards");
63  async_discard_num_events,
64  10,
65  "The number of discard events to wait for in the AsyncFileWriter.discard "
66  "test");
67 
68 using namespace folly;
69 using namespace std::literals::chrono_literals;
71 using std::chrono::milliseconds;
72 using std::chrono::steady_clock;
74 
75 TEST(AsyncFileWriter, noMessages) {
76  TemporaryFile tmpFile{"logging_test"};
77 
78  // Test the simple construction and destruction of an AsyncFileWriter
79  // without ever writing any messages. This still exercises the I/O
80  // thread start-up and shutdown code.
81  AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
82 }
83 
84 TEST(AsyncFileWriter, simpleMessages) {
85  TemporaryFile tmpFile{"logging_test"};
86 
87  {
88  AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
89  for (int n = 0; n < 10; ++n) {
90  writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
92  }
93  }
94  tmpFile.close();
95 
97  auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
98  ASSERT_TRUE(ret);
99 
100  std::string expected =
101  "message 0\n"
102  "message 1\n"
103  "message 2\n"
104  "message 3\n"
105  "message 4\n"
106  "message 5\n"
107  "message 6\n"
108  "message 7\n"
109  "message 8\n"
110  "message 9\n";
111  EXPECT_EQ(expected, data);
112 }
113 
114 namespace {
115 static std::vector<std::string>* internalWarnings;
116 
117 void handleLoggingError(
118  StringPiece /* file */,
119  int /* lineNumber */,
120  std::string&& msg) {
121  internalWarnings->emplace_back(std::move(msg));
122 }
123 } // namespace
124 
126  // Set the LoggerDB internal warning handler so we can record the messages
127  std::vector<std::string> logErrors;
128  internalWarnings = &logErrors;
129  LoggerDB::setInternalWarningHandler(handleLoggingError);
130 
131  // Create an AsyncFileWriter that refers to a pipe whose read end is closed
132  std::array<int, 2> fds;
133  auto rc = pipe(fds.data());
134  folly::checkUnixError(rc, "failed to create pipe");
135 #ifndef _WIN32
136  signal(SIGPIPE, SIG_IGN);
137 #endif
138  ::close(fds[0]);
139 
140  // Log a bunch of messages to the writer
141  size_t numMessages = 100;
142  {
143  AsyncFileWriter writer{folly::File{fds[1], true}};
144  for (size_t n = 0; n < numMessages; ++n) {
145  writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
147  }
148  }
149 
151 
152  // AsyncFileWriter should have some internal warning messages about the
153  // log failures. This will generally be many fewer than the number of
154  // messages we wrote, though, since it performs write batching.
155  //
156  // GTest on Windows doesn't support alternation in the regex syntax -_-....
157  const std::string kExpectedErrorMessage =
158 #if _WIN32
159  // The `pipe` call above is actually implemented via sockets, so we get
160  // a different error message.
161  "An established connection was aborted by the software in your host machine\\.";
162 #else
163  "Broken pipe";
164 #endif
165 
166  for (const auto& msg : logErrors) {
167  EXPECT_THAT(
168  msg,
170  "error writing to log file .* in AsyncFileWriter.*: " +
171  kExpectedErrorMessage));
172  }
173  EXPECT_GT(logErrors.size(), 0);
174  EXPECT_LE(logErrors.size(), numMessages);
175 }
176 
177 namespace {
178 size_t fillUpPipe(int fd) {
179  int flags = fcntl(fd, F_GETFL);
180  folly::checkUnixError(flags, "failed get file descriptor flags");
181  auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
182  folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
183  std::vector<char> data;
184  data.resize(4000);
185  size_t totalBytes = 0;
186  size_t bytesToWrite = data.size();
187  while (true) {
188  auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
189  if (bytesWritten < 0) {
190  if (errno == EAGAIN || errno == EWOULDBLOCK) {
191  // We blocked. Keep trying smaller writes, until we get down to a
192  // single byte, just to make sure the logging code really won't be able
193  // to write anything to the pipe.
194  if (bytesToWrite <= 1) {
195  break;
196  } else {
197  bytesToWrite /= 2;
198  }
199  } else {
200  throwSystemError("error writing to pipe");
201  }
202  } else {
203  totalBytes += bytesWritten;
204  }
205  }
206  XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
207 
208  rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
209  folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
210 
211  return totalBytes;
212 }
213 } // namespace
214 
216  // Set up a pipe(), then write data to the write endpoint until it fills up
217  // and starts blocking.
218  std::array<int, 2> fds;
219  auto rc = pipe(fds.data());
220  folly::checkUnixError(rc, "failed to create pipe");
221  File readPipe{fds[0], true};
222  File writePipe{fds[1], true};
223 
224  auto paddingSize = fillUpPipe(writePipe.fd());
225 
226  // Now set up an AsyncFileWriter pointing at the write end of the pipe
227  AsyncFileWriter writer{std::move(writePipe)};
228 
229  // Write a message
230  writer.writeMessage("test message: " + std::string(200, 'x'));
231 
232  // Call flush(). Use a separate thread, since this should block until we
233  // consume data from the pipe.
234  Promise<Unit> promise;
235  auto future = promise.getFuture();
236  auto flushFunction = [&] { writer.flush(); };
237  std::thread flushThread{
238  [&]() { promise.setTry(makeTryWith(flushFunction)); }};
239  // Detach the flush thread now rather than joining it at the end of the
240  // function. This way if something goes wrong during the test we will fail
241  // with the real error, rather than crashing due to the std::thread
242  // destructor running on a still-joinable thread.
243  flushThread.detach();
244 
245  // Sleep briefly, and make sure flush() still hasn't completed.
246  // If it has completed this doesn't necessarily indicate a bug in
247  // AsyncFileWriter, but instead indicates that our test code failed to
248  // successfully cause a blocking write.
249  /* sleep override */
250  std::this_thread::sleep_for(10ms);
251  EXPECT_FALSE(future.isReady());
252 
253  // Now read from the pipe
254  std::vector<char> buf;
255  buf.resize(paddingSize);
256  auto bytesRead = readFull(readPipe.fd(), buf.data(), buf.size());
257  EXPECT_EQ(bytesRead, paddingSize);
258 
259  // Make sure flush completes successfully now
260  std::move(future).get(10ms);
261 }
262 
263 // A large-ish message suffix, just to consume space and help fill up
264 // log buffers faster.
265 static constexpr StringPiece kMsgSuffix{
266  "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
267  "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
268  "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
269  "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
270 
271 class ReadStats {
272  public:
274  : deadline_{steady_clock::now() +
275  milliseconds{FLAGS_async_discard_timeout_msec}},
276  readSleepUS_{static_cast<uint64_t>(
277  std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
278 
280  readSleepUS_.store(0);
281  }
282  std::chrono::microseconds getSleepUS() const {
283  return std::chrono::microseconds{readSleepUS_.load()};
284  }
285 
286  bool shouldWriterStop() const {
287  // Stop after we have seen the required number of separate discard events.
288  // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
289  // ensures the async writer blocks and then makes progress again multiple
290  // times.
291  if (FLAGS_async_discard_num_events > 0 &&
292  discardEventsSeen_.load() >
293  static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
294  return true;
295  }
296 
297  // Stop after a timeout, even if we don't hit the number of requested
298  // discards.
299  return steady_clock::now() > deadline_;
300  }
301  void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
302  auto map = perThreadWriteData_.wlock();
304  map->find(threadID) == map->end(),
305  "multiple writer threads with same ID");
306  auto& data = (*map)[threadID];
307  data.numMessagesWritten = messagesWritten;
308  data.flags = flags;
309  }
310 
311  void check() {
312  auto writeDataMap = perThreadWriteData_.wlock();
313 
314  EXPECT_EQ("", trailingData_);
315  EXPECT_EQ(0, numUnableToParse_);
316  EXPECT_EQ(0, numOutOfOrder_);
317 
318  // Check messages received from each writer thread
319  size_t readerStatsChecked = 0;
320  size_t totalMessagesWritten = 0;
321  size_t totalMessagesRead = 0;
322  for (const auto& writeEntry : *writeDataMap) {
323  const auto& writeInfo = writeEntry.second;
324  totalMessagesWritten += writeInfo.numMessagesWritten;
325 
326  auto iter = perThreadReadData_.find(writeEntry.first);
327  if (iter == perThreadReadData_.end()) {
328  // We never received any messages from this writer thread.
329  // This is okay as long as this is not a NEVER_DISCARD writer.
330  EXPECT_EQ(0, writeInfo.flags);
331  continue;
332  }
333  const auto& readInfo = iter->second;
334  ++readerStatsChecked;
335  totalMessagesRead += readInfo.numMessagesRead;
336  if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
337  // Non-discarding threads should never discard anything
338  EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
339  EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
340  } else {
341  // Other threads may have discarded some messages
342  EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
343  EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
344  }
345  }
346 
347  EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
348  EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
349 
350  // This test is intended to check the discard behavior.
351  // Fail the test if we didn't actually trigger any discards before we timed
352  // out.
353  EXPECT_GT(numDiscarded_, 0);
354 
355  XLOG(DBG1) << totalMessagesWritten << " messages written, "
356  << totalMessagesRead << " messages read, " << numDiscarded_
357  << " messages discarded";
358  }
359 
361  if (msg.endsWith(" log messages discarded: "
362  "logging faster than we can write")) {
363  auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
364  XLOG(DBG3, "received discard notification: ", discardCount);
365  numDiscarded_ += discardCount;
366  ++discardEventsSeen_;
367  return;
368  }
369 
370  size_t threadID = 0;
371  size_t messageIndex = 0;
372  try {
373  parseMessage(msg, &threadID, &messageIndex);
374  } catch (const std::exception& ex) {
375  ++numUnableToParse_;
376  XLOG(ERR, "unable to parse log message: ", msg);
377  return;
378  }
379 
380  auto& data = perThreadReadData_[threadID];
381  data.numMessagesRead++;
382  if (messageIndex > data.lastId) {
383  data.lastId = messageIndex;
384  } else {
385  ++numOutOfOrder_;
386  XLOG(ERR) << "received out-of-order messages from writer " << threadID
387  << ": " << messageIndex << " received after " << data.lastId;
388  }
389  }
390 
392  trailingData_ = data.str();
393  }
394 
395  private:
396  struct ReaderData {
397  size_t numMessagesRead{0};
398  size_t lastId{0};
399  };
400  struct WriterData {
401  size_t numMessagesWritten{0};
402  int flags{0};
403  };
404 
405  void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
406  // Validate and strip off the message prefix and suffix
407  constexpr StringPiece prefix{"thread "};
408  if (!msg.startsWith(prefix)) {
409  throw std::runtime_error("bad message prefix");
410  }
411  msg.advance(prefix.size());
412  if (!msg.endsWith(kMsgSuffix)) {
413  throw std::runtime_error("bad message suffix");
414  }
415  msg.subtract(kMsgSuffix.size());
416 
417  // Parse then strip off the thread index
418  auto threadIDEnd = msg.find(' ');
419  if (threadIDEnd == StringPiece::npos) {
420  throw std::runtime_error("no middle found");
421  }
422  *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
423  msg.advance(threadIDEnd);
424 
425  // Validate that the middle of the message is what we expect,
426  // then strip it off
427  constexpr StringPiece middle{" message "};
428  if (!msg.startsWith(middle)) {
429  throw std::runtime_error("bad message middle");
430  }
431  msg.advance(middle.size());
432 
433  // Parse the message index
434  *messageIndex = folly::to<size_t>(msg);
435  }
436 
450  std::unordered_map<size_t, ReaderData> perThreadReadData_;
451 
452  /*
453  * Additional information recorded by the reader thread.
454  */
456  size_t numUnableToParse_{0};
457  size_t numOutOfOrder_{0};
458  size_t numDiscarded_{0};
459 
466  const std::chrono::steady_clock::time_point deadline_;
467 
479  std::atomic<uint64_t> readSleepUS_{0};
480 
490  std::atomic<uint64_t> discardEventsSeen_{0};
491 
500 };
501 
506 void readThread(folly::File&& file, ReadStats* stats) {
507  std::vector<char> buffer;
508  buffer.resize(1024);
509 
510  size_t bufferIdx = 0;
511  while (true) {
512  /* sleep override */
513  std::this_thread::sleep_for(stats->getSleepUS());
514 
515  auto readResult = folly::readNoInt(
516  file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
517  if (readResult < 0) {
518  XLOG(ERR, "error reading from pipe: ", errno);
519  return;
520  }
521  if (readResult == 0) {
522  XLOG(DBG2, "read EOF");
523  break;
524  }
525 
526  auto logDataLen = bufferIdx + readResult;
527  StringPiece logData{buffer.data(), logDataLen};
528  auto idx = 0;
529  while (true) {
530  auto end = logData.find('\n', idx);
531  if (end == StringPiece::npos) {
532  bufferIdx = logDataLen - idx;
533  memmove(buffer.data(), buffer.data() + idx, bufferIdx);
534  break;
535  }
536 
537  StringPiece logMsg{logData.data() + idx, end - idx};
538  stats->messageReceived(logMsg);
539  idx = end + 1;
540  }
541  }
542 
543  if (bufferIdx != 0) {
544  stats->trailingData(StringPiece{buffer.data(), bufferIdx});
545  }
546 }
547 
552  AsyncFileWriter* writer,
553  size_t id,
554  uint32_t flags,
555  ReadStats* readStats) {
556  size_t msgID = 0;
557  while (true) {
558  ++msgID;
559  writer->writeMessage(
560  folly::to<std::string>(
561  "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
562  flags);
563 
564  // Break out once the reader has seen enough discards
565  if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
566  readStats->writerFinished(id, msgID, flags);
567  break;
568  }
569  }
570 }
571 
572 /*
573  * The discard test spawns a number of threads that each write a large number
574  * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
575  * thread reads from it slowly, causing a backlog to build up.
576  *
577  * The test then checks that:
578  * - The read thread always receives full messages (no partial log messages)
579  * - Messages that are received are received in order
580  * - The number of messages received plus the number reported in discard
581  * notifications matches the number of messages sent.
582  */
584  std::array<int, 2> fds;
585  auto pipeResult = pipe(fds.data());
586  folly::checkUnixError(pipeResult, "pipe failed");
587  folly::File readPipe{fds[0], true};
588  folly::File writePipe{fds[1], true};
589 
590  ReadStats readStats;
591  std::thread reader(readThread, std::move(readPipe), &readStats);
592  {
593  AsyncFileWriter writer{std::move(writePipe)};
594 
595  std::vector<std::thread> writeThreads;
596  size_t numThreads = FLAGS_async_discard_num_normal_writers +
597  FLAGS_async_discard_num_nodiscard_writers;
598 
599  for (size_t n = 0; n < numThreads; ++n) {
600  uint32_t flags = 0;
601  if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
602  flags = LogWriter::NEVER_DISCARD;
603  }
604  XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
605 
606  writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
607  }
608 
609  for (auto& t : writeThreads) {
610  t.join();
611  }
612  XLOG(DBG2, "writers done");
613  }
614  // Clear the read sleep duration so the reader will finish quickly now
615  readStats.clearSleepDuration();
616  reader.join();
617  readStats.check();
618 }
619 
625 #if FOLLY_HAVE_PTHREAD_ATFORK
626  TemporaryFile tmpFile{"logging_test"};
627 
628  // The number of messages to send before the fork and from each process
629  constexpr size_t numMessages = 10;
630  constexpr size_t numBgThreads = 2;
631 
632  // This can be increased to add some delay in the parent and child messages
633  // so that they are likely to be interleaved in the log rather than grouped
634  // together. This doesn't really affect the test behavior or correctness
635  // otherwise, though.
636  constexpr milliseconds sleepDuration(0);
637 
638  {
639  AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
640  writer.writeMessage(folly::to<std::string>("parent pid=", getpid(), "\n"));
641 
642  // Start some background threads just to exercise the behavior
643  // when other threads are also logging to the writer when the fork occurs
644  std::vector<std::thread> bgThreads;
645  std::atomic<bool> stop{false};
646  for (size_t n = 0; n < numBgThreads; ++n) {
647  bgThreads.emplace_back([&] {
648  size_t iter = 0;
649  while (!stop) {
650  writer.writeMessage(
651  folly::to<std::string>("bgthread_", getpid(), "_", iter, "\n"));
652  ++iter;
653  }
654  });
655  }
656 
657  for (size_t n = 0; n < numMessages; ++n) {
658  writer.writeMessage(folly::to<std::string>("prefork", n, "\n"));
659  }
660 
661  auto pid = fork();
662  folly::checkUnixError(pid, "failed to fork");
663  if (pid == 0) {
664  writer.writeMessage(folly::to<std::string>("child pid=", getpid(), "\n"));
665  for (size_t n = 0; n < numMessages; ++n) {
666  writer.writeMessage(folly::to<std::string>("child", n, "\n"));
667  std::this_thread::sleep_for(sleepDuration);
668  }
669 
670  // Use _exit() rather than exit() in the child, purely to prevent
671  // ASAN from complaining that we leak memory for the background threads.
672  // (These threads don't actually exist in the child, so it is difficult
673  // to clean up their allocated state entirely.)
674  //
675  // Explicitly flush the writer since exiting with _exit() won't do this
676  // automatically.
677  writer.flush();
678  _exit(0);
679  }
680 
681  for (size_t n = 0; n < numMessages; ++n) {
682  writer.writeMessage(folly::to<std::string>("parent", n, "\n"));
683  std::this_thread::sleep_for(sleepDuration);
684  }
685 
686  // Stop the background threads.
687  stop = true;
688  for (auto& t : bgThreads) {
689  t.join();
690  }
691 
692  int status;
693  auto waited = waitpid(pid, &status, 0);
694  folly::checkUnixError(waited, "failed to wait on child");
695  ASSERT_EQ(waited, pid);
696  }
697 
698  // Read back the logged messages
699  tmpFile.close();
701  auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
702  ASSERT_TRUE(ret) << "failed to read log file";
703 
704  XLOG(DBG1) << "log contents:\n" << data;
705 
706  // The log file should contain all of the messages we wrote, from both the
707  // parent and child processes.
708  for (size_t n = 0; n < numMessages; ++n) {
709  EXPECT_THAT(
710  data, ContainsRegex(folly::to<std::string>("prefork", n, "\n")));
711  EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("parent", n, "\n")));
712  EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("child", n, "\n")));
713  }
714 #else
715  SKIP() << "pthread_atfork() is not supported on this platform";
716 #endif // FOLLY_HAVE_PTHREAD_ATFORK
717 }
718 
726 TEST(AsyncFileWriter, crazyForks) {
727 #if FOLLY_HAVE_PTHREAD_ATFORK
728  constexpr size_t numAsyncWriterThreads = 10;
729  constexpr size_t numForkThreads = 5;
730  constexpr size_t numForkIterations = 20;
731  std::atomic<bool> stop{false};
732 
733  // Spawn several threads that continuously create and destroy
734  // AsyncFileWriter objects.
735  std::vector<std::thread> asyncWriterThreads;
736  for (size_t n = 0; n < numAsyncWriterThreads; ++n) {
737  asyncWriterThreads.emplace_back([n, &stop] {
738  folly::setThreadName(folly::to<std::string>("async_", n));
739 
740  TemporaryFile tmpFile{"logging_test"};
741  while (!stop) {
742  // Create an AsyncFileWriter, write a message to it, then destroy it.
743  AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
744  writer.writeMessage(folly::to<std::string>(
745  "async thread ", folly::getOSThreadID(), "\n"));
746  }
747  });
748  }
749 
750  // Spawn several threads that repeatedly fork.
751  std::vector<std::thread> forkThreads;
752  std::mutex forkStartMutex;
753  std::condition_variable forkStartCV;
754  bool forkStart = false;
755  for (size_t n = 0; n < numForkThreads; ++n) {
756  forkThreads.emplace_back([n, &forkStartMutex, &forkStartCV, &forkStart] {
757  folly::setThreadName(folly::to<std::string>("fork_", n));
758 
759  // Wait until forkStart is set just to have a better chance of all the
760  // fork threads running simultaneously.
761  {
762  std::unique_lock<std::mutex> l(forkStartMutex);
763  forkStartCV.wait(l, [&forkStart] { return forkStart; });
764  }
765 
766  for (size_t i = 0; i < numForkIterations; ++i) {
767  XLOG(DBG3) << "fork " << n << ":" << i;
768  auto pid = fork();
769  folly::checkUnixError(pid, "forkFailed");
770  if (pid == 0) {
771  XLOG(DBG3) << "child " << getpid();
772  _exit(0);
773  }
774 
775  // parent
776  int status;
777  auto waited = waitpid(pid, &status, 0);
778  folly::checkUnixError(waited, "failed to wait on child");
779  EXPECT_EQ(waited, pid);
780  }
781  });
782  }
783 
784  // Kick off the fork threads
785  {
786  std::unique_lock<std::mutex> l(forkStartMutex);
787  forkStart = true;
788  }
789  forkStartCV.notify_all();
790 
791  // Wait for the fork threads to finish
792  for (auto& t : forkThreads) {
793  t.join();
794  }
795 
796  // Stop and wait for the AsyncFileWriter threads
797  stop = true;
798  for (auto& t : asyncWriterThreads) {
799  t.join();
800  }
801 #else
802  SKIP() << "pthread_atfork() is not supported on this platform";
803 #endif // FOLLY_HAVE_PTHREAD_ATFORK
804 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
std::vector< uint8_t > buffer(kBufferSize+16)
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
bool readFile(int fd, Container &out, size_t num_bytes=std::numeric_limits< size_t >::max())
Definition: FileUtil.h:125
flags
Definition: http_parser.h:127
std::string str() const
Definition: Range.h:591
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
std::chrono::microseconds getSleepUS() const
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PolymorphicMatcher< internal::MatchesRegexMatcher > ContainsRegex(const internal::RE *regex)
size_type find(const_range_type str) const
Definition: Range.h:721
ssize_t readNoInt(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:102
void advance(size_type n)
Definition: Range.h:672
ssize_t readFull(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:126
constexpr size_type size() const
Definition: Range.h:431
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void writeThread(AsyncFileWriter *writer, size_t id, uint32_t flags, ReadStats *readStats)
bool prefix(Cursor &c, uint32_t expected)
#define SKIP()
Definition: TestUtils.h:55
uint64_t getOSThreadID()
Definition: ThreadId.h:80
static constexpr StringPiece kMsgSuffix
void setTry(Try< T > &&t)
Definition: Promise-inl.h:122
folly::Synchronized< std::unordered_map< size_t, WriterData > > perThreadWriteData_
const std::chrono::steady_clock::time_point deadline_
static void stop()
LogLevel min
Definition: LogLevel.cpp:30
int fd() const
Definition: File.h:85
void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags)
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
constexpr Iter data() const
Definition: Range.h:446
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
ssize_t writeNoInt(int fd, const void *buf, size_t count)
Definition: FileUtil.cpp:114
Range subpiece(size_type first, size_type length=npos) const
Definition: Range.h:686
Future< T > getFuture()
Definition: Promise-inl.h:97
Definition: Traits.h:594
void messageReceived(StringPiece msg)
#define FOLLY_SAFE_CHECK(expr, msg)
Definition: SafeAssert.h:35
bool shouldWriterStop() const
std::unordered_map< size_t, ReaderData > perThreadReadData_
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
void trailingData(StringPiece data)
static void setInternalWarningHandler(InternalWarningHandler handler)
Definition: LoggerDB.cpp:656
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
#define EXPECT_THAT(value, matcher)
bool startsWith(const const_range_type &other) const
Definition: Range.h:828
static const size_type npos
Definition: Range.h:197
std::enable_if< !std::is_same< invoke_result_t< F >, void >::value, Try< invoke_result_t< F > > >::type makeTryWith(F &&f)
Definition: Try-inl.h:223
std::mutex mutex
const char * string
Definition: Conv.cpp:212
void parseMessage(StringPiece msg, size_t *threadID, size_t *messageIndex)
void subtract(size_type n)
Definition: Range.h:679
#define XLOGF(level, fmt, arg1,...)
Definition: xlog.h:77
std::string trailingData_
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
#define XLOG(level,...)
Definition: xlog.h:57
bool endsWith(const const_range_type &other) const
Definition: Range.h:849
void throwSystemError(Args &&...args)
Definition: Exception.h:76
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
int close(NetworkSocket s)
Definition: NetOps.cpp:90
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void readThread(folly::File &&file, ReadStats *stats)
DEFINE_int64(async_discard_num_normal_writers, 30,"number of threads to use to generate normal log messages during ""the AsyncFileWriter.discard test")
void pipe(CPUExecutor cpu, IOExecutor io)
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934