proxygen
folly::AsyncFileWriter Class Reference

#include <AsyncFileWriter.h>

Inheritance diagram for folly::AsyncFileWriter:
folly::LogWriter

Classes

struct  Data
 

Public Member Functions

 AsyncFileWriter (folly::StringPiece path)
 
 AsyncFileWriter (folly::File &&file)
 
 ~AsyncFileWriter ()
 
void writeMessage (folly::StringPiece buffer, uint32_t flags=0) override
 
void writeMessage (std::string &&buffer, uint32_t flags=0) override
 
void flush () override
 
bool ttyOutput () const override
 
void setMaxBufferSize (size_t size)
 
size_t getMaxBufferSize () const
 
const folly::FilegetFile () const
 
- Public Member Functions inherited from folly::LogWriter
virtual ~LogWriter ()
 

Static Public Attributes

static constexpr size_t kDefaultMaxBufferSize = 1024 * 1024
 

Private Types

enum  Flags : uint32_t {
  FLAG_IO_THREAD_STARTED = 0x01, FLAG_DESTROYING = 0x02, FLAG_STOP = 0x04, FLAG_IO_THREAD_STOPPED = 0x08,
  FLAG_IO_THREAD_JOINED = 0x10
}
 

Private Member Functions

void ioThread ()
 
void performIO (std::vector< std::string > *ioQueue, size_t numDiscarded)
 
void onIoError (const std::exception &ex)
 
std::string getNumDiscardedMsg (size_t numDiscarded)
 
bool preFork ()
 
void postForkParent ()
 
void postForkChild ()
 
void stopIoThread (folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)
 
void restartThread ()
 

Private Attributes

folly::File file_
 
folly::Synchronized< Data, std::mutexdata_
 
std::condition_variable messageReady_
 
std::condition_variable ioCV_
 
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
 

Additional Inherited Members

- Public Types inherited from folly::LogWriter
enum  Flags : uint32_t { NO_FLAGS = 0x00, NEVER_DISCARD = 0x01 }
 

Detailed Description

A LogWriter implementation that asynchronously writes to a file descriptor.

This class performs the log I/O in a separarate thread.

The advantage of this class over ImmediateFileWriter is that logging I/O can never slow down or block your normal program operation. If log messages are generated faster than they can be written, messages will be dropped (and an indication of how many messages were dropped will be written to the log file when we are able to catch up a bit.)

However, one downside is that if your program crashes, not all log messages may have been written, so you may lose messages generated immediately before the crash.

Definition at line 44 of file AsyncFileWriter.h.

Member Enumeration Documentation

Enumerator
FLAG_IO_THREAD_STARTED 
FLAG_DESTROYING 
FLAG_STOP 
FLAG_IO_THREAD_STOPPED 
FLAG_IO_THREAD_JOINED 

Definition at line 103 of file AsyncFileWriter.h.

103  : uint32_t {
104  // FLAG_IO_THREAD_STARTED indicates that the constructor has started the
105  // I/O thread.
106  FLAG_IO_THREAD_STARTED = 0x01,
107  // FLAG_DESTROYING indicates that the destructor is running and destroying
108  // the I/O thread.
109  FLAG_DESTROYING = 0x02,
110  // FLAG_STOP indicates that the I/O thread has been asked to stop.
111  // This is set either by the destructor or by preFork()
112  FLAG_STOP = 0x04,
113  // FLAG_IO_THREAD_STOPPED indicates that the I/O thread is about to return
114  // and can now be joined. ioCV_ will be signalled when this flag is set.
115  FLAG_IO_THREAD_STOPPED = 0x08,
116  // FLAG_IO_THREAD_JOINED indicates that the I/O thread has been joined.
117  FLAG_IO_THREAD_JOINED = 0x10,
118  };

Constructor & Destructor Documentation

folly::AsyncFileWriter::AsyncFileWriter ( folly::StringPiece  path)
explicit

Construct an AsyncFileWriter that appends to the file at the specified path.

Definition at line 31 of file AsyncFileWriter.cpp.

References folly::Range< Iter >::str().

32  : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {}
std::string str() const
Definition: Range.h:591
AsyncFileWriter(folly::StringPiece path)
folly::AsyncFileWriter::AsyncFileWriter ( folly::File &&  file)
explicit

Construct an AsyncFileWriter that writes to the specified File object.

Definition at line 33 of file AsyncFileWriter.cpp.

References folly::data(), data_, FLAG_IO_THREAD_STARTED, ioThread(), folly::gen::move, postForkChild(), postForkParent(), preFork(), and folly::detail::AtFork::registerHandler().

33  : file_{std::move(file)} {
35  this,
36  [this] { return preFork(); },
37  [this] { postForkParent(); },
38  [this] { postForkChild(); });
39 
40  // Start the I/O thread after registering the atfork handler.
41  // preFork() may be invoked in another thread as soon as registerHandler()
42  // returns. It will check FLAG_IO_THREAD_STARTED to see if the I/O thread is
43  // running yet.
44  {
45  auto data = data_.lock();
46  data->flags |= FLAG_IO_THREAD_STARTED;
47  data->ioThread = std::thread([this] { ioThread(); });
48  }
49 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
Definition: AtFork.cpp:108
folly::AsyncFileWriter::~AsyncFileWriter ( )

Definition at line 51 of file AsyncFileWriter.cpp.

References folly::data(), data_, FLAG_DESTROYING, onIoError(), performIO(), stopIoThread(), and folly::detail::AtFork::unregisterHandler().

51  {
52  std::vector<std::string>* ioQueue;
53  size_t numDiscarded;
54  {
55  // Stop the I/O thread
56  auto data = data_.lock();
58 
59  // stopIoThread() causes the I/O thread to stop as soon as possible,
60  // without waiting for all pending messages to be written. Extract any
61  // remaining messages to write them below.
62  ioQueue = data->getCurrentQueue();
63  numDiscarded = data->numDiscarded;
64  }
65 
66  // Unregister the atfork handler after stopping the I/O thread.
67  // preFork(), postForkParent(), and postForkChild() calls can run
68  // concurrently with the destructor until unregisterHandler() returns.
70 
71  // If there are still any pending messages, flush them now.
72  if (!ioQueue->empty()) {
73  try {
74  performIO(ioQueue, numDiscarded);
75  } catch (const std::exception& ex) {
76  onIoError(ex);
77  }
78  }
79 }
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
void onIoError(const std::exception &ex)
void performIO(std::vector< std::string > *ioQueue, size_t numDiscarded)
static void unregisterHandler(void *object)
Definition: AtFork.cpp:118
void stopIoThread(folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)

Member Function Documentation

void folly::AsyncFileWriter::flush ( )
overridevirtual

Block until the I/O thread has finished writing all messages that were already enqueued when flush() was called.

Implements folly::LogWriter.

Definition at line 103 of file AsyncFileWriter.cpp.

References folly::data(), data_, ioCV_, messageReady_, and start.

103  {
104  auto data = data_.lock();
105  auto start = data->ioThreadCounter;
106 
107  // Wait until ioThreadCounter increments by at least two.
108  // Waiting for a single increment is not sufficient, as this happens after
109  // the I/O thread has swapped the queues, which is before it has actually
110  // done the I/O.
111  while (data->ioThreadCounter < start + 2) {
112  // Enqueue an empty string and wake the I/O thread.
113  // The empty string ensures that the I/O thread will break out of its wait
114  // loop and increment the ioThreadCounter, even if there is no other work
115  // to do.
116  data->getCurrentQueue()->emplace_back();
117  messageReady_.notify_one();
118 
119  // Wait for notification from the I/O thread that it has done work.
120  ioCV_.wait(data.getUniqueLock());
121  }
122 }
std::condition_variable messageReady_
std::condition_variable ioCV_
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
auto start
const folly::File& folly::AsyncFileWriter::getFile ( ) const
inline

Get the output file.

Definition at line 98 of file AsyncFileWriter.h.

References file_.

98  {
99  return file_;
100  }
size_t folly::AsyncFileWriter::getMaxBufferSize ( ) const

Get the maximum buffer size for this AsyncFileWriter, in bytes.

Definition at line 129 of file AsyncFileWriter.cpp.

References folly::data(), and data_.

129  {
130  auto data = data_.lock();
131  return data->maxBufferBytes;
132 }
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
std::string folly::AsyncFileWriter::getNumDiscardedMsg ( size_t  numDiscarded)
private

Definition at line 228 of file AsyncFileWriter.cpp.

Referenced by folly::AsyncFileWriter::Data::getCurrentQueue(), and performIO().

228  {
229  // We may want to make this customizable in the future (e.g., to allow it to
230  // conform to the LogFormatter style being used).
231  // For now just return a simple fixed message.
232  return folly::to<std::string>(
233  numDiscarded,
234  " log messages discarded: logging faster than we can write\n");
235 }
void folly::AsyncFileWriter::ioThread ( )
private

Definition at line 134 of file AsyncFileWriter.cpp.

References folly::data(), data_, FLAG_IO_THREAD_STOPPED, FLAG_STOP, ioCV_, messageReady_, onIoError(), performIO(), and folly::setThreadName().

Referenced by AsyncFileWriter(), and restartThread().

134  {
135  folly::setThreadName("log_writer");
136 
137  while (true) {
138  // With the lock held, grab a pointer to the current queue, then increment
139  // the ioThreadCounter index so that other threads will write into the
140  // other queue as we process this one.
141  std::vector<std::string>* ioQueue;
142  size_t numDiscarded;
143  {
144  auto data = data_.lock();
145  ioQueue = data->getCurrentQueue();
146  while (ioQueue->empty() && !(data->flags & FLAG_STOP)) {
147  // Wait for a message or one of the above flags to be set.
148  messageReady_.wait(data.getUniqueLock());
149  }
150 
151  if (data->flags & FLAG_STOP) {
152  // We have been asked to stop. We exit immediately in this case
153  // without writing out any pending messages. If we are stopping due
154  // to a fork() the I/O thread will be restarted after the fork (as
155  // long as we are not also being destroyed). If we are stopping due
156  // to the destructor, any remaining messages will be written out
157  // inside the destructor.
158  data->flags |= FLAG_IO_THREAD_STOPPED;
159  data.unlock();
160  ioCV_.notify_all();
161  return;
162  }
163 
164  ++data->ioThreadCounter;
165  numDiscarded = data->numDiscarded;
166  data->numDiscarded = 0;
167  data->currentBufferSize = 0;
168  }
169  ioCV_.notify_all();
170 
171  // Write the log messages now that we have released the lock
172  try {
173  performIO(ioQueue, numDiscarded);
174  } catch (const std::exception& ex) {
175  onIoError(ex);
176  }
177 
178  // clear() empties the vector, but the allocated capacity remains so we can
179  // just reuse it without having to re-allocate in most cases.
180  ioQueue->clear();
181  }
182 }
std::condition_variable messageReady_
std::condition_variable ioCV_
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
void onIoError(const std::exception &ex)
void performIO(std::vector< std::string > *ioQueue, size_t numDiscarded)
void folly::AsyncFileWriter::onIoError ( const std::exception &  ex)
private

Definition at line 218 of file AsyncFileWriter.cpp.

References folly::exceptionStr(), folly::File::fd(), file_, and folly::LoggerDB::internalWarning().

Referenced by folly::AsyncFileWriter::Data::getCurrentQueue(), ioThread(), and ~AsyncFileWriter().

218  {
220  __FILE__,
221  __LINE__,
222  "error writing to log file ",
223  file_.fd(),
224  " in AsyncFileWriter: ",
225  folly::exceptionStr(ex));
226 }
fbstring exceptionStr(const std::exception &e)
int fd() const
Definition: File.h:85
static void internalWarning(folly::StringPiece file, int lineNumber, Args &&...args) noexcept
Definition: LoggerDB.h:201
void folly::AsyncFileWriter::performIO ( std::vector< std::string > *  ioQueue,
size_t  numDiscarded 
)
private

Definition at line 184 of file AsyncFileWriter.cpp.

References folly::checkUnixError(), folly::File::fd(), file_, getNumDiscardedMsg(), folly::size(), folly::writeFull(), and folly::writevFull().

Referenced by folly::AsyncFileWriter::Data::getCurrentQueue(), ioThread(), and ~AsyncFileWriter().

186  {
187  // kNumIovecs controls the maximum number of strings we write at once in a
188  // single writev() call.
189  constexpr int kNumIovecs = 64;
190  std::array<iovec, kNumIovecs> iovecs;
191 
192  size_t idx = 0;
193  while (idx < ioQueue->size()) {
194  int numIovecs = 0;
195  while (numIovecs < kNumIovecs && idx < ioQueue->size()) {
196  const auto& str = (*ioQueue)[idx];
197  iovecs[numIovecs].iov_base = const_cast<char*>(str.data());
198  iovecs[numIovecs].iov_len = str.size();
199  ++numIovecs;
200  ++idx;
201  }
202 
203  auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs);
204  folly::checkUnixError(ret, "writeFull() failed");
205  }
206 
207  if (numDiscarded > 0) {
208  auto msg = getNumDiscardedMsg(numDiscarded);
209  if (!msg.empty()) {
210  auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size());
211  // We currently ignore errors from writeFull() here.
212  // There's not much we can really do.
213  (void)ret;
214  }
215  }
216 }
ssize_t writevFull(int fd, iovec *iov, int count)
Definition: FileUtil.cpp:150
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
int fd() const
Definition: File.h:85
std::string getNumDiscardedMsg(size_t numDiscarded)
ssize_t writeFull(int fd, const void *buf, size_t count)
Definition: FileUtil.cpp:134
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
void folly::AsyncFileWriter::postForkChild ( )
private

Definition at line 264 of file AsyncFileWriter.cpp.

References lockedData_, and restartThread().

Referenced by AsyncFileWriter(), and folly::AsyncFileWriter::Data::getCurrentQueue().

264  {
265  // Clear any messages in the queue. We only want them to be written once,
266  // and we let the parent process handle writing them.
267  lockedData_->queues[0].clear();
268  lockedData_->queues[1].clear();
269 
270  // Restart the I/O thread
271  restartThread();
272 }
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
void folly::AsyncFileWriter::postForkParent ( )
private

Definition at line 259 of file AsyncFileWriter.cpp.

References restartThread().

Referenced by AsyncFileWriter(), and folly::AsyncFileWriter::Data::getCurrentQueue().

259  {
260  // Restart the I/O thread
261  restartThread();
262 }
bool folly::AsyncFileWriter::preFork ( )
private

Definition at line 237 of file AsyncFileWriter.cpp.

References data_, FLAG_IO_THREAD_STARTED, lockedData_, and stopIoThread().

Referenced by AsyncFileWriter(), and folly::AsyncFileWriter::Data::getCurrentQueue().

237  {
238  // Stop the I/O thread.
239  //
240  // It would perhaps be better to not stop the I/O thread in the parent
241  // process. However, this leaves us in a slightly tricky situation in the
242  // child process where data_->ioThread has been initialized and does not
243  // really point to a valid thread. While we could store it in a union and
244  // replace it without ever calling its destructor, in practice this still has
245  // some tricky corner cases to deal with.
246 
247  // Grab the data lock to ensure no other thread is holding it
248  // while we fork.
249  lockedData_ = data_.lock();
250 
251  // If the I/O thread has been started, stop it now
252  if (lockedData_->flags & FLAG_IO_THREAD_STARTED) {
254  }
255 
256  return true;
257 }
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
folly::Synchronized< Data, std::mutex > data_
void stopIoThread(folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)
void folly::AsyncFileWriter::restartThread ( )
private

Definition at line 292 of file AsyncFileWriter.cpp.

References folly::data(), FLAG_DESTROYING, FLAG_IO_THREAD_JOINED, FLAG_IO_THREAD_STARTED, FLAG_IO_THREAD_STOPPED, FLAG_STOP, ioThread(), lockedData_, and folly::gen::move.

Referenced by folly::AsyncFileWriter::Data::getCurrentQueue(), postForkChild(), and postForkParent().

292  {
293  // Move lockedData_ into a local member variable so it will be released
294  // when we return.
297 
298  if (!(data->flags & FLAG_IO_THREAD_STARTED)) {
299  // Do not start the I/O thread if the constructor has not finished yet
300  return;
301  }
302  if (data->flags & FLAG_DESTROYING) {
303  // Do not restart the I/O thread if we were being destroyed.
304  // If there are more pending messages that need to be flushed the
305  // destructor's stopIoThread() call will handle flushing the messages in
306  // this case.
307  return;
308  }
309 
311  data->ioThread = std::thread([this] { ioThread(); });
312 }
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
typename Base::LockedPtr LockedPtr
Definition: Synchronized.h:453
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
void folly::AsyncFileWriter::setMaxBufferSize ( size_t  size)

Set the maximum buffer size for this AsyncFileWriter, in bytes.

This controls the upper bound on how much unwritten data will be buffered in memory. If messages are being logged faster than they can be written to output file, new messages will be discarded if they would cause the amount of buffered data to exceed this limit.

Definition at line 124 of file AsyncFileWriter.cpp.

References folly::data(), data_, and folly::size().

124  {
125  auto data = data_.lock();
126  data->maxBufferBytes = size;
127 }
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_
void folly::AsyncFileWriter::stopIoThread ( folly::Synchronized< Data, std::mutex >::LockedPtr data,
uint32_t  extraFlags 
)
private

Definition at line 274 of file AsyncFileWriter.cpp.

References FLAG_IO_THREAD_JOINED, FLAG_IO_THREAD_STOPPED, FLAG_STOP, ioCV_, and messageReady_.

Referenced by folly::AsyncFileWriter::Data::getCurrentQueue(), preFork(), and ~AsyncFileWriter().

276  {
277  data->flags |= (FLAG_STOP | extraFlags);
278  messageReady_.notify_one();
279  ioCV_.wait(data.getUniqueLock(), [&] {
280  return bool(data->flags & FLAG_IO_THREAD_STOPPED);
281  });
282 
283  // Check FLAG_IO_THREAD_JOINED before calling join().
284  // preFork() and the destructor may both run concurrently in separate
285  // threads, and only one should try to join the thread.
286  if ((data->flags & FLAG_IO_THREAD_JOINED) == 0) {
287  data->ioThread.join();
288  data->flags |= FLAG_IO_THREAD_JOINED;
289  }
290 }
std::condition_variable messageReady_
std::condition_variable ioCV_
bool folly::AsyncFileWriter::ttyOutput ( ) const
overridevirtual

Returns true if the output steam is a tty.

Implements folly::LogWriter.

Definition at line 81 of file AsyncFileWriter.cpp.

References folly::File::fd(), and file_.

81  {
82  return isatty(file_.fd());
83 }
int fd() const
Definition: File.h:85
void folly::AsyncFileWriter::writeMessage ( folly::StringPiece  buffer,
uint32_t  flags = 0 
)
overridevirtual

Write a serialized log message.

The flags parameter is a bitwise-ORed set of Flag values defined above.

Implements folly::LogWriter.

Definition at line 85 of file AsyncFileWriter.cpp.

References folly::Range< Iter >::str().

Referenced by writeThread().

85  {
86  return writeMessage(buffer.str(), flags);
87 }
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
flags
Definition: http_parser.h:127
std::string str() const
Definition: Range.h:591
void folly::AsyncFileWriter::writeMessage ( std::string &&  buffer,
uint32_t  flags = 0 
)
overridevirtual

Write a serialized message.

This version of writeMessage() accepts a std::string&&. The default implementation calls the StringPiece version of writeMessage(), but subclasses may override this implementation if desired.

Reimplemented from folly::LogWriter.

Definition at line 89 of file AsyncFileWriter.cpp.

References buffer(), folly::data(), data_, messageReady_, folly::gen::move, and folly::LogWriter::NEVER_DISCARD.

89  {
90  auto data = data_.lock();
91  if ((data->currentBufferSize >= data->maxBufferBytes) &&
92  !(flags & NEVER_DISCARD)) {
93  ++data->numDiscarded;
94  return;
95  }
96 
97  data->currentBufferSize += buffer.size();
98  auto* queue = data->getCurrentQueue();
99  queue->emplace_back(std::move(buffer));
100  messageReady_.notify_one();
101 }
std::vector< uint8_t > buffer(kBufferSize+16)
std::condition_variable messageReady_
flags
Definition: http_parser.h:127
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
folly::Synchronized< Data, std::mutex > data_

Member Data Documentation

folly::File folly::AsyncFileWriter::file_
private

Definition at line 156 of file AsyncFileWriter.h.

Referenced by getFile(), onIoError(), performIO(), and ttyOutput().

std::condition_variable folly::AsyncFileWriter::ioCV_
private

ioCV_ is signaled by the I/O thread each time it increments the ioThreadCounter (once each time around its loop).

Definition at line 168 of file AsyncFileWriter.h.

Referenced by flush(), ioThread(), and stopIoThread().

constexpr size_t folly::AsyncFileWriter::kDefaultMaxBufferSize = 1024 * 1024
static

The default maximum buffer size.

The comments for setMaxBufferSize() explain how this parameter is used.

Definition at line 51 of file AsyncFileWriter.h.

Referenced by TEST().

folly::Synchronized<Data, std::mutex>::LockedPtr folly::AsyncFileWriter::lockedData_
private

lockedData_ exists only to help pass the lock between preFork() and postForkParent()/postForkChild(). We potentially could add some new low-level methods to Synchronized to allow manually locking and unlocking to avoid having to store this object as a member variable.

Definition at line 176 of file AsyncFileWriter.h.

Referenced by postForkChild(), preFork(), and restartThread().

std::condition_variable folly::AsyncFileWriter::messageReady_
private

messageReady_ is signaled by writer threads whenever they add a new message to the current queue.

Definition at line 163 of file AsyncFileWriter.h.

Referenced by flush(), ioThread(), stopIoThread(), and writeMessage().


The documentation for this class was generated from the following files: