43 async_discard_num_normal_writers,
45 "number of threads to use to generate normal log messages during " 46 "the AsyncFileWriter.discard test");
48 async_discard_num_nodiscard_writers,
50 "number of threads to use to generate non-discardable log messages during " 51 "the AsyncFileWriter.discard test");
53 async_discard_read_sleep_usec,
55 "how long the read thread should sleep between reads in " 56 "the AsyncFileWriter.discard test");
58 async_discard_timeout_msec,
60 "A timeout for the AsyncFileWriter.discard test if it cannot generate " 63 async_discard_num_events,
65 "The number of discard events to wait for in the AsyncFileWriter.discard " 68 using namespace folly;
71 using std::chrono::milliseconds;
72 using std::chrono::steady_clock;
89 for (
int n = 0; n < 10; ++n) {
90 writer.writeMessage(folly::to<std::string>(
"message ", n,
"\n"));
115 static std::vector<std::string>* internalWarnings;
117 void handleLoggingError(
121 internalWarnings->emplace_back(
std::move(msg));
127 std::vector<std::string> logErrors;
128 internalWarnings = &logErrors;
132 std::array<int, 2> fds;
133 auto rc =
pipe(fds.data());
136 signal(SIGPIPE, SIG_IGN);
141 size_t numMessages = 100;
144 for (
size_t n = 0; n < numMessages; ++n) {
145 writer.writeMessage(folly::to<std::string>(
"message ", n,
"\n"));
161 "An established connection was aborted by the software in your host machine\\.";
166 for (
const auto& msg : logErrors) {
170 "error writing to log file .* in AsyncFileWriter.*: " +
171 kExpectedErrorMessage));
174 EXPECT_LE(logErrors.size(), numMessages);
178 size_t fillUpPipe(
int fd) {
179 int flags = fcntl(fd, F_GETFL);
181 auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
183 std::vector<char>
data;
185 size_t totalBytes = 0;
186 size_t bytesToWrite = data.size();
188 auto bytesWritten =
writeNoInt(fd, data.data(), bytesToWrite);
189 if (bytesWritten < 0) {
190 if (errno == EAGAIN || errno == EWOULDBLOCK) {
194 if (bytesToWrite <= 1) {
203 totalBytes += bytesWritten;
206 XLOG(
DBG1,
"pipe filled up after ", totalBytes,
" bytes");
208 rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
218 std::array<int, 2> fds;
219 auto rc =
pipe(fds.data());
221 File readPipe{fds[0],
true};
222 File writePipe{fds[1],
true};
224 auto paddingSize = fillUpPipe(writePipe.fd());
230 writer.writeMessage(
"test message: " +
std::string(200,
'x'));
236 auto flushFunction = [&] { writer.flush(); };
237 std::thread flushThread{
243 flushThread.detach();
250 std::this_thread::sleep_for(10ms);
254 std::vector<char> buf;
255 buf.resize(paddingSize);
256 auto bytesRead =
readFull(readPipe.fd(), buf.data(), buf.size());
266 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" 267 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" 268 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" 269 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
275 milliseconds{FLAGS_async_discard_timeout_msec}},
280 readSleepUS_.store(0);
283 return std::chrono::microseconds{readSleepUS_.load()};
291 if (FLAGS_async_discard_num_events > 0 &&
292 discardEventsSeen_.load() >
293 static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
302 auto map = perThreadWriteData_.wlock();
304 map->find(threadID) ==
map->end(),
305 "multiple writer threads with same ID");
306 auto&
data = (*map)[threadID];
307 data.numMessagesWritten = messagesWritten;
312 auto writeDataMap = perThreadWriteData_.wlock();
319 size_t readerStatsChecked = 0;
320 size_t totalMessagesWritten = 0;
321 size_t totalMessagesRead = 0;
322 for (
const auto& writeEntry : *writeDataMap) {
323 const auto& writeInfo = writeEntry.second;
324 totalMessagesWritten += writeInfo.numMessagesWritten;
326 auto iter = perThreadReadData_.find(writeEntry.first);
327 if (iter == perThreadReadData_.end()) {
333 const auto& readInfo = iter->second;
334 ++readerStatsChecked;
335 totalMessagesRead += readInfo.numMessagesRead;
338 EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
339 EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
342 EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
343 EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
347 EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
348 EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
355 XLOG(
DBG1) << totalMessagesWritten <<
" messages written, " 356 << totalMessagesRead <<
" messages read, " << numDiscarded_
357 <<
" messages discarded";
361 if (msg.
endsWith(
" log messages discarded: " 362 "logging faster than we can write")) {
363 auto discardCount = folly::to<size_t>(msg.
subpiece(0, msg.
find(
' ')));
364 XLOG(
DBG3,
"received discard notification: ", discardCount);
365 numDiscarded_ += discardCount;
366 ++discardEventsSeen_;
371 size_t messageIndex = 0;
373 parseMessage(msg, &threadID, &messageIndex);
374 }
catch (
const std::exception& ex) {
376 XLOG(
ERR,
"unable to parse log message: ", msg);
380 auto&
data = perThreadReadData_[threadID];
381 data.numMessagesRead++;
382 if (messageIndex >
data.lastId) {
383 data.lastId = messageIndex;
386 XLOG(
ERR) <<
"received out-of-order messages from writer " << threadID
387 <<
": " << messageIndex <<
" received after " <<
data.lastId;
392 trailingData_ = data.
str();
397 size_t numMessagesRead{0};
401 size_t numMessagesWritten{0};
409 throw std::runtime_error(
"bad message prefix");
413 throw std::runtime_error(
"bad message suffix");
418 auto threadIDEnd = msg.
find(
' ');
420 throw std::runtime_error(
"no middle found");
422 *threadID = folly::to<size_t>(msg.
subpiece(0, threadIDEnd));
429 throw std::runtime_error(
"bad message middle");
434 *messageIndex = folly::to<size_t>(msg);
456 size_t numUnableToParse_{0};
457 size_t numOutOfOrder_{0};
458 size_t numDiscarded_{0};
479 std::atomic<uint64_t> readSleepUS_{0};
490 std::atomic<uint64_t> discardEventsSeen_{0};
510 size_t bufferIdx = 0;
513 std::this_thread::sleep_for(stats->
getSleepUS());
516 file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
517 if (readResult < 0) {
518 XLOG(
ERR,
"error reading from pipe: ", errno);
521 if (readResult == 0) {
526 auto logDataLen = bufferIdx + readResult;
530 auto end = logData.find(
'\n', idx);
532 bufferIdx = logDataLen - idx;
533 memmove(buffer.data(), buffer.data() + idx, bufferIdx);
543 if (bufferIdx != 0) {
560 folly::to<std::string>(
561 "thread ",
id,
" message ", msgID,
kMsgSuffix,
'\n'),
584 std::array<int, 2> fds;
585 auto pipeResult =
pipe(fds.data());
595 std::vector<std::thread> writeThreads;
596 size_t numThreads = FLAGS_async_discard_num_normal_writers +
597 FLAGS_async_discard_num_nodiscard_writers;
599 for (
size_t n = 0; n < numThreads; ++n) {
601 if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
604 XLOGF(
DBG4,
"writer {:4d} flags {:#02x}", n, flags);
606 writeThreads.emplace_back(
writeThread, &writer, n, flags, &readStats);
609 for (
auto&
t : writeThreads) {
625 #if FOLLY_HAVE_PTHREAD_ATFORK 629 constexpr
size_t numMessages = 10;
630 constexpr
size_t numBgThreads = 2;
636 constexpr milliseconds sleepDuration(0);
640 writer.writeMessage(folly::to<std::string>(
"parent pid=", getpid(),
"\n"));
644 std::vector<std::thread> bgThreads;
645 std::atomic<bool>
stop{
false};
646 for (
size_t n = 0; n < numBgThreads; ++n) {
647 bgThreads.emplace_back([&] {
651 folly::to<std::string>(
"bgthread_", getpid(),
"_", iter,
"\n"));
657 for (
size_t n = 0; n < numMessages; ++n) {
658 writer.writeMessage(folly::to<std::string>(
"prefork", n,
"\n"));
664 writer.writeMessage(folly::to<std::string>(
"child pid=", getpid(),
"\n"));
665 for (
size_t n = 0; n < numMessages; ++n) {
666 writer.writeMessage(folly::to<std::string>(
"child", n,
"\n"));
667 std::this_thread::sleep_for(sleepDuration);
681 for (
size_t n = 0; n < numMessages; ++n) {
682 writer.writeMessage(folly::to<std::string>(
"parent", n,
"\n"));
683 std::this_thread::sleep_for(sleepDuration);
688 for (
auto&
t : bgThreads) {
693 auto waited = waitpid(pid, &status, 0);
708 for (
size_t n = 0; n < numMessages; ++n) {
710 data,
ContainsRegex(folly::to<std::string>(
"prefork", n,
"\n")));
715 SKIP() <<
"pthread_atfork() is not supported on this platform";
716 #endif // FOLLY_HAVE_PTHREAD_ATFORK 727 #if FOLLY_HAVE_PTHREAD_ATFORK 728 constexpr
size_t numAsyncWriterThreads = 10;
729 constexpr
size_t numForkThreads = 5;
730 constexpr
size_t numForkIterations = 20;
731 std::atomic<bool>
stop{
false};
735 std::vector<std::thread> asyncWriterThreads;
736 for (
size_t n = 0; n < numAsyncWriterThreads; ++n) {
737 asyncWriterThreads.emplace_back([n, &
stop] {
744 writer.writeMessage(folly::to<std::string>(
751 std::vector<std::thread> forkThreads;
753 std::condition_variable forkStartCV;
754 bool forkStart =
false;
755 for (
size_t n = 0; n < numForkThreads; ++n) {
756 forkThreads.emplace_back([n, &forkStartMutex, &forkStartCV, &forkStart] {
762 std::unique_lock<std::mutex> l(forkStartMutex);
763 forkStartCV.wait(l, [&forkStart] {
return forkStart; });
766 for (
size_t i = 0;
i < numForkIterations; ++
i) {
777 auto waited = waitpid(pid, &status, 0);
786 std::unique_lock<std::mutex> l(forkStartMutex);
789 forkStartCV.notify_all();
792 for (
auto&
t : forkThreads) {
798 for (
auto&
t : asyncWriterThreads) {
802 SKIP() <<
"pthread_atfork() is not supported on this platform";
803 #endif // FOLLY_HAVE_PTHREAD_ATFORK
#define EXPECT_LE(val1, val2)
std::vector< uint8_t > buffer(kBufferSize+16)
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
bool readFile(int fd, Container &out, size_t num_bytes=std::numeric_limits< size_t >::max())
#define ASSERT_EQ(val1, val2)
std::chrono::microseconds getSleepUS() const
#define EXPECT_EQ(val1, val2)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
PolymorphicMatcher< internal::MatchesRegexMatcher > ContainsRegex(const internal::RE *regex)
size_type find(const_range_type str) const
ssize_t readNoInt(int fd, void *buf, size_t count)
void advance(size_type n)
ssize_t readFull(int fd, void *buf, size_t count)
constexpr size_type size() const
—— Concurrent Priority Queue Implementation ——
void writeThread(AsyncFileWriter *writer, size_t id, uint32_t flags, ReadStats *readStats)
bool prefix(Cursor &c, uint32_t expected)
static constexpr StringPiece kMsgSuffix
void setTry(Try< T > &&t)
folly::Synchronized< std::unordered_map< size_t, WriterData > > perThreadWriteData_
const std::chrono::steady_clock::time_point deadline_
void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags)
auto end(TestAdlIterable &instance)
constexpr Iter data() const
constexpr auto data(C &c) -> decltype(c.data())
ssize_t writeNoInt(int fd, const void *buf, size_t count)
Range subpiece(size_type first, size_type length=npos) const
void messageReceived(StringPiece msg)
#define FOLLY_SAFE_CHECK(expr, msg)
bool shouldWriterStop() const
std::unordered_map< size_t, ReaderData > perThreadReadData_
void checkUnixError(ssize_t ret, Args &&...args)
void trailingData(StringPiece data)
static void setInternalWarningHandler(InternalWarningHandler handler)
bool setThreadName(std::thread::id tid, StringPiece name)
#define EXPECT_THAT(value, matcher)
bool startsWith(const const_range_type &other) const
static const size_type npos
std::enable_if< !std::is_same< invoke_result_t< F >, void >::value, Try< invoke_result_t< F > > >::type makeTryWith(F &&f)
void parseMessage(StringPiece msg, size_t *threadID, size_t *messageIndex)
void subtract(size_type n)
#define XLOGF(level, fmt, arg1,...)
std::string trailingData_
#define EXPECT_FALSE(condition)
bool endsWith(const const_range_type &other) const
void throwSystemError(Args &&...args)
#define ASSERT_TRUE(condition)
int close(NetworkSocket s)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void readThread(folly::File &&file, ReadStats *stats)
DEFINE_int64(async_discard_num_normal_writers, 30,"number of threads to use to generate normal log messages during ""the AsyncFileWriter.discard test")
void pipe(CPUExecutor cpu, IOExecutor io)
#define EXPECT_GT(val1, val2)
void clearSleepDuration()