proxygen
ReadStats Class Reference

Classes

struct  ReaderData
 
struct  WriterData
 

Public Member Functions

 ReadStats ()
 
void clearSleepDuration ()
 
std::chrono::microseconds getSleepUS () const
 
bool shouldWriterStop () const
 
void writerFinished (size_t threadID, size_t messagesWritten, uint32_t flags)
 
void check ()
 
void messageReceived (StringPiece msg)
 
void trailingData (StringPiece data)
 

Private Member Functions

void parseMessage (StringPiece msg, size_t *threadID, size_t *messageIndex)
 

Private Attributes

std::unordered_map< size_t, ReaderDataperThreadReadData_
 
std::string trailingData_
 
size_t numUnableToParse_ {0}
 
size_t numOutOfOrder_ {0}
 
size_t numDiscarded_ {0}
 
const std::chrono::steady_clock::time_point deadline_
 
std::atomic< uint64_treadSleepUS_ {0}
 
std::atomic< uint64_tdiscardEventsSeen_ {0}
 
folly::Synchronized< std::unordered_map< size_t, WriterData > > perThreadWriteData_
 

Detailed Description

Definition at line 271 of file AsyncFileWriterTest.cpp.

Constructor & Destructor Documentation

ReadStats::ReadStats ( )
inline

Definition at line 273 of file AsyncFileWriterTest.cpp.

References int64_t, min, now(), and uint64_t.

275  milliseconds{FLAGS_async_discard_timeout_msec}},
276  readSleepUS_{static_cast<uint64_t>(
277  std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
std::chrono::steady_clock::time_point now()
const std::chrono::steady_clock::time_point deadline_
LogLevel min
Definition: LogLevel.cpp:30
std::atomic< uint64_t > readSleepUS_

Member Function Documentation

void ReadStats::check ( )
inline

Definition at line 311 of file AsyncFileWriterTest.cpp.

References folly::DBG1, EXPECT_EQ, EXPECT_GT, EXPECT_LE, folly::LogWriter::NEVER_DISCARD, and XLOG.

Referenced by TEST().

311  {
312  auto writeDataMap = perThreadWriteData_.wlock();
313 
317 
318  // Check messages received from each writer thread
319  size_t readerStatsChecked = 0;
320  size_t totalMessagesWritten = 0;
321  size_t totalMessagesRead = 0;
322  for (const auto& writeEntry : *writeDataMap) {
323  const auto& writeInfo = writeEntry.second;
324  totalMessagesWritten += writeInfo.numMessagesWritten;
325 
326  auto iter = perThreadReadData_.find(writeEntry.first);
327  if (iter == perThreadReadData_.end()) {
328  // We never received any messages from this writer thread.
329  // This is okay as long as this is not a NEVER_DISCARD writer.
330  EXPECT_EQ(0, writeInfo.flags);
331  continue;
332  }
333  const auto& readInfo = iter->second;
334  ++readerStatsChecked;
335  totalMessagesRead += readInfo.numMessagesRead;
336  if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
337  // Non-discarding threads should never discard anything
338  EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
339  EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
340  } else {
341  // Other threads may have discarded some messages
342  EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
343  EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
344  }
345  }
346 
347  EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
348  EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
349 
350  // This test is intended to check the discard behavior.
351  // Fail the test if we didn't actually trigger any discards before we timed
352  // out.
354 
355  XLOG(DBG1) << totalMessagesWritten << " messages written, "
356  << totalMessagesRead << " messages read, " << numDiscarded_
357  << " messages discarded";
358  }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
folly::Synchronized< std::unordered_map< size_t, WriterData > > perThreadWriteData_
std::unordered_map< size_t, ReaderData > perThreadReadData_
std::string trailingData_
#define XLOG(level,...)
Definition: xlog.h:57
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934
void ReadStats::clearSleepDuration ( )
inline

Definition at line 279 of file AsyncFileWriterTest.cpp.

Referenced by TEST().

279  {
280  readSleepUS_.store(0);
281  }
std::atomic< uint64_t > readSleepUS_
std::chrono::microseconds ReadStats::getSleepUS ( ) const
inline

Definition at line 282 of file AsyncFileWriterTest.cpp.

Referenced by readThread().

282  {
283  return std::chrono::microseconds{readSleepUS_.load()};
284  }
std::atomic< uint64_t > readSleepUS_
void ReadStats::messageReceived ( StringPiece  msg)
inline

Definition at line 360 of file AsyncFileWriterTest.cpp.

References folly::data(), folly::DBG3, folly::Range< Iter >::endsWith(), folly::ERR, folly::Range< Iter >::find(), folly::Range< Iter >::subpiece(), and XLOG.

Referenced by readThread().

360  {
361  if (msg.endsWith(" log messages discarded: "
362  "logging faster than we can write")) {
363  auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
364  XLOG(DBG3, "received discard notification: ", discardCount);
365  numDiscarded_ += discardCount;
367  return;
368  }
369 
370  size_t threadID = 0;
371  size_t messageIndex = 0;
372  try {
373  parseMessage(msg, &threadID, &messageIndex);
374  } catch (const std::exception& ex) {
376  XLOG(ERR, "unable to parse log message: ", msg);
377  return;
378  }
379 
380  auto& data = perThreadReadData_[threadID];
381  data.numMessagesRead++;
382  if (messageIndex > data.lastId) {
383  data.lastId = messageIndex;
384  } else {
385  ++numOutOfOrder_;
386  XLOG(ERR) << "received out-of-order messages from writer " << threadID
387  << ": " << messageIndex << " received after " << data.lastId;
388  }
389  }
size_type find(const_range_type str) const
Definition: Range.h:721
std::atomic< uint64_t > discardEventsSeen_
Range subpiece(size_type first, size_type length=npos) const
Definition: Range.h:686
std::unordered_map< size_t, ReaderData > perThreadReadData_
void parseMessage(StringPiece msg, size_t *threadID, size_t *messageIndex)
#define XLOG(level,...)
Definition: xlog.h:57
bool endsWith(const const_range_type &other) const
Definition: Range.h:849
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
void ReadStats::parseMessage ( StringPiece  msg,
size_t *  threadID,
size_t *  messageIndex 
)
inlineprivate

Definition at line 405 of file AsyncFileWriterTest.cpp.

References folly::Range< Iter >::advance(), folly::Range< Iter >::endsWith(), folly::Range< Iter >::find(), kMsgSuffix, folly::Range< const char * >::npos, prefix(), folly::Range< Iter >::size(), folly::Range< Iter >::startsWith(), folly::Range< Iter >::subpiece(), and folly::Range< Iter >::subtract().

405  {
406  // Validate and strip off the message prefix and suffix
407  constexpr StringPiece prefix{"thread "};
408  if (!msg.startsWith(prefix)) {
409  throw std::runtime_error("bad message prefix");
410  }
411  msg.advance(prefix.size());
412  if (!msg.endsWith(kMsgSuffix)) {
413  throw std::runtime_error("bad message suffix");
414  }
415  msg.subtract(kMsgSuffix.size());
416 
417  // Parse then strip off the thread index
418  auto threadIDEnd = msg.find(' ');
419  if (threadIDEnd == StringPiece::npos) {
420  throw std::runtime_error("no middle found");
421  }
422  *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
423  msg.advance(threadIDEnd);
424 
425  // Validate that the middle of the message is what we expect,
426  // then strip it off
427  constexpr StringPiece middle{" message "};
428  if (!msg.startsWith(middle)) {
429  throw std::runtime_error("bad message middle");
430  }
431  msg.advance(middle.size());
432 
433  // Parse the message index
434  *messageIndex = folly::to<size_t>(msg);
435  }
size_type find(const_range_type str) const
Definition: Range.h:721
void advance(size_type n)
Definition: Range.h:672
constexpr size_type size() const
Definition: Range.h:431
bool prefix(Cursor &c, uint32_t expected)
static constexpr StringPiece kMsgSuffix
Range subpiece(size_type first, size_type length=npos) const
Definition: Range.h:686
bool startsWith(const const_range_type &other) const
Definition: Range.h:828
void subtract(size_type n)
Definition: Range.h:679
bool endsWith(const const_range_type &other) const
Definition: Range.h:849
bool ReadStats::shouldWriterStop ( ) const
inline

Definition at line 286 of file AsyncFileWriterTest.cpp.

References now(), and uint64_t.

Referenced by writeThread().

286  {
287  // Stop after we have seen the required number of separate discard events.
288  // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
289  // ensures the async writer blocks and then makes progress again multiple
290  // times.
291  if (FLAGS_async_discard_num_events > 0 &&
292  discardEventsSeen_.load() >
293  static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
294  return true;
295  }
296 
297  // Stop after a timeout, even if we don't hit the number of requested
298  // discards.
299  return steady_clock::now() > deadline_;
300  }
std::chrono::steady_clock::time_point now()
std::atomic< uint64_t > discardEventsSeen_
const std::chrono::steady_clock::time_point deadline_
void ReadStats::trailingData ( StringPiece  data)
inline

Definition at line 391 of file AsyncFileWriterTest.cpp.

References folly::Range< Iter >::str().

Referenced by readThread().

391  {
392  trailingData_ = data.str();
393  }
std::string str() const
Definition: Range.h:591
std::string trailingData_
void ReadStats::writerFinished ( size_t  threadID,
size_t  messagesWritten,
uint32_t  flags 
)
inline

Definition at line 301 of file AsyncFileWriterTest.cpp.

References folly::data(), and FOLLY_SAFE_CHECK.

Referenced by writeThread().

301  {
302  auto map = perThreadWriteData_.wlock();
304  map->find(threadID) == map->end(),
305  "multiple writer threads with same ID");
306  auto& data = (*map)[threadID];
307  data.numMessagesWritten = messagesWritten;
308  data.flags = flags;
309  }
flags
Definition: http_parser.h:127
folly::Synchronized< std::unordered_map< size_t, WriterData > > perThreadWriteData_
Definition: Traits.h:594
#define FOLLY_SAFE_CHECK(expr, msg)
Definition: SafeAssert.h:35
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43

Member Data Documentation

const std::chrono::steady_clock::time_point ReadStats::deadline_
private

deadline_ is a maximum end time for the test.

The writer threads quit if the deadline is reached even if they have not produced the desired number of discard events yet.

Definition at line 466 of file AsyncFileWriterTest.cpp.

std::atomic<uint64_t> ReadStats::discardEventsSeen_ {0}
private

A count of how many discard events have been seen so far.

The reader increments discardEventsSeen_ each time it sees a discard notification message. A "discard event" basically corresponds to a single group of dropped messages. Once the reader pulls some messages off out of the pipe the writers should be able to send more data, but the buffer will eventually fill up again, producing another discard event.

Definition at line 490 of file AsyncFileWriterTest.cpp.

size_t ReadStats::numDiscarded_ {0}
private

Definition at line 458 of file AsyncFileWriterTest.cpp.

size_t ReadStats::numOutOfOrder_ {0}
private

Definition at line 457 of file AsyncFileWriterTest.cpp.

size_t ReadStats::numUnableToParse_ {0}
private

Definition at line 456 of file AsyncFileWriterTest.cpp.

std::unordered_map<size_t, ReaderData> ReadStats::perThreadReadData_
private

Data about each writer thread, as recorded by the reader thread.

At the end of the test we will compare perThreadReadData_ (recorded by the reader) with perThreadWriteData_ (recorded by the writers) to make sure the data matches up.

This is a map from writer_thread_id to ReaderData. The writer_thread_id is extracted from the received messages.

This field does not need locking as it is only updated by the single reader thread.

Definition at line 450 of file AsyncFileWriterTest.cpp.

folly::Synchronized<std::unordered_map<size_t, WriterData> > ReadStats::perThreadWriteData_
private

Data about each writer thread, as recorded by the writers.

When each writer thread finishes it records how many messages it wrote, plus the flags it used to write the messages.

Definition at line 499 of file AsyncFileWriterTest.cpp.

std::atomic<uint64_t> ReadStats::readSleepUS_ {0}
private

How long the reader thread should sleep between each read event.

This is initially set to a non-zero value (read from the FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads slowly, which will fill up the pipe buffer and cause discard events.

Once we have produce enough discards and are ready to finish the test the main thread reduces readSleepUS_ to 0, so the reader will finish the remaining message backlog quickly.

Definition at line 479 of file AsyncFileWriterTest.cpp.

std::string ReadStats::trailingData_
private

Definition at line 455 of file AsyncFileWriterTest.cpp.


The documentation for this class was generated from the following file: