proxygen
AsyncFileWriter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/Exception.h>
19 #include <folly/FileUtil.h>
20 #include <folly/detail/AtFork.h>
21 #include <folly/logging/LoggerDB.h>
23 
24 using folly::File;
25 using folly::StringPiece;
26 
27 namespace folly {
28 
30 
32  : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {}
35  this,
36  [this] { return preFork(); },
37  [this] { postForkParent(); },
38  [this] { postForkChild(); });
39 
40  // Start the I/O thread after registering the atfork handler.
41  // preFork() may be invoked in another thread as soon as registerHandler()
42  // returns. It will check FLAG_IO_THREAD_STARTED to see if the I/O thread is
43  // running yet.
44  {
45  auto data = data_.lock();
46  data->flags |= FLAG_IO_THREAD_STARTED;
47  data->ioThread = std::thread([this] { ioThread(); });
48  }
49 }
50 
52  std::vector<std::string>* ioQueue;
53  size_t numDiscarded;
54  {
55  // Stop the I/O thread
56  auto data = data_.lock();
58 
59  // stopIoThread() causes the I/O thread to stop as soon as possible,
60  // without waiting for all pending messages to be written. Extract any
61  // remaining messages to write them below.
62  ioQueue = data->getCurrentQueue();
63  numDiscarded = data->numDiscarded;
64  }
65 
66  // Unregister the atfork handler after stopping the I/O thread.
67  // preFork(), postForkParent(), and postForkChild() calls can run
68  // concurrently with the destructor until unregisterHandler() returns.
70 
71  // If there are still any pending messages, flush them now.
72  if (!ioQueue->empty()) {
73  try {
74  performIO(ioQueue, numDiscarded);
75  } catch (const std::exception& ex) {
76  onIoError(ex);
77  }
78  }
79 }
80 
82  return isatty(file_.fd());
83 }
84 
86  return writeMessage(buffer.str(), flags);
87 }
88 
90  auto data = data_.lock();
91  if ((data->currentBufferSize >= data->maxBufferBytes) &&
92  !(flags & NEVER_DISCARD)) {
93  ++data->numDiscarded;
94  return;
95  }
96 
97  data->currentBufferSize += buffer.size();
98  auto* queue = data->getCurrentQueue();
99  queue->emplace_back(std::move(buffer));
100  messageReady_.notify_one();
101 }
102 
104  auto data = data_.lock();
105  auto start = data->ioThreadCounter;
106 
107  // Wait until ioThreadCounter increments by at least two.
108  // Waiting for a single increment is not sufficient, as this happens after
109  // the I/O thread has swapped the queues, which is before it has actually
110  // done the I/O.
111  while (data->ioThreadCounter < start + 2) {
112  // Enqueue an empty string and wake the I/O thread.
113  // The empty string ensures that the I/O thread will break out of its wait
114  // loop and increment the ioThreadCounter, even if there is no other work
115  // to do.
116  data->getCurrentQueue()->emplace_back();
117  messageReady_.notify_one();
118 
119  // Wait for notification from the I/O thread that it has done work.
120  ioCV_.wait(data.getUniqueLock());
121  }
122 }
123 
125  auto data = data_.lock();
126  data->maxBufferBytes = size;
127 }
128 
130  auto data = data_.lock();
131  return data->maxBufferBytes;
132 }
133 
135  folly::setThreadName("log_writer");
136 
137  while (true) {
138  // With the lock held, grab a pointer to the current queue, then increment
139  // the ioThreadCounter index so that other threads will write into the
140  // other queue as we process this one.
141  std::vector<std::string>* ioQueue;
142  size_t numDiscarded;
143  {
144  auto data = data_.lock();
145  ioQueue = data->getCurrentQueue();
146  while (ioQueue->empty() && !(data->flags & FLAG_STOP)) {
147  // Wait for a message or one of the above flags to be set.
148  messageReady_.wait(data.getUniqueLock());
149  }
150 
151  if (data->flags & FLAG_STOP) {
152  // We have been asked to stop. We exit immediately in this case
153  // without writing out any pending messages. If we are stopping due
154  // to a fork() the I/O thread will be restarted after the fork (as
155  // long as we are not also being destroyed). If we are stopping due
156  // to the destructor, any remaining messages will be written out
157  // inside the destructor.
158  data->flags |= FLAG_IO_THREAD_STOPPED;
159  data.unlock();
160  ioCV_.notify_all();
161  return;
162  }
163 
164  ++data->ioThreadCounter;
165  numDiscarded = data->numDiscarded;
166  data->numDiscarded = 0;
167  data->currentBufferSize = 0;
168  }
169  ioCV_.notify_all();
170 
171  // Write the log messages now that we have released the lock
172  try {
173  performIO(ioQueue, numDiscarded);
174  } catch (const std::exception& ex) {
175  onIoError(ex);
176  }
177 
178  // clear() empties the vector, but the allocated capacity remains so we can
179  // just reuse it without having to re-allocate in most cases.
180  ioQueue->clear();
181  }
182 }
183 
185  std::vector<std::string>* ioQueue,
186  size_t numDiscarded) {
187  // kNumIovecs controls the maximum number of strings we write at once in a
188  // single writev() call.
189  constexpr int kNumIovecs = 64;
190  std::array<iovec, kNumIovecs> iovecs;
191 
192  size_t idx = 0;
193  while (idx < ioQueue->size()) {
194  int numIovecs = 0;
195  while (numIovecs < kNumIovecs && idx < ioQueue->size()) {
196  const auto& str = (*ioQueue)[idx];
197  iovecs[numIovecs].iov_base = const_cast<char*>(str.data());
198  iovecs[numIovecs].iov_len = str.size();
199  ++numIovecs;
200  ++idx;
201  }
202 
203  auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs);
204  folly::checkUnixError(ret, "writeFull() failed");
205  }
206 
207  if (numDiscarded > 0) {
208  auto msg = getNumDiscardedMsg(numDiscarded);
209  if (!msg.empty()) {
210  auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size());
211  // We currently ignore errors from writeFull() here.
212  // There's not much we can really do.
213  (void)ret;
214  }
215  }
216 }
217 
218 void AsyncFileWriter::onIoError(const std::exception& ex) {
220  __FILE__,
221  __LINE__,
222  "error writing to log file ",
223  file_.fd(),
224  " in AsyncFileWriter: ",
225  folly::exceptionStr(ex));
226 }
227 
229  // We may want to make this customizable in the future (e.g., to allow it to
230  // conform to the LogFormatter style being used).
231  // For now just return a simple fixed message.
232  return folly::to<std::string>(
233  numDiscarded,
234  " log messages discarded: logging faster than we can write\n");
235 }
236 
238  // Stop the I/O thread.
239  //
240  // It would perhaps be better to not stop the I/O thread in the parent
241  // process. However, this leaves us in a slightly tricky situation in the
242  // child process where data_->ioThread has been initialized and does not
243  // really point to a valid thread. While we could store it in a union and
244  // replace it without ever calling its destructor, in practice this still has
245  // some tricky corner cases to deal with.
246 
247  // Grab the data lock to ensure no other thread is holding it
248  // while we fork.
249  lockedData_ = data_.lock();
250 
251  // If the I/O thread has been started, stop it now
252  if (lockedData_->flags & FLAG_IO_THREAD_STARTED) {
254  }
255 
256  return true;
257 }
258 
260  // Restart the I/O thread
261  restartThread();
262 }
263 
265  // Clear any messages in the queue. We only want them to be written once,
266  // and we let the parent process handle writing them.
267  lockedData_->queues[0].clear();
268  lockedData_->queues[1].clear();
269 
270  // Restart the I/O thread
271  restartThread();
272 }
273 
276  uint32_t extraFlags) {
277  data->flags |= (FLAG_STOP | extraFlags);
278  messageReady_.notify_one();
279  ioCV_.wait(data.getUniqueLock(), [&] {
280  return bool(data->flags & FLAG_IO_THREAD_STOPPED);
281  });
282 
283  // Check FLAG_IO_THREAD_JOINED before calling join().
284  // preFork() and the destructor may both run concurrently in separate
285  // threads, and only one should try to join the thread.
286  if ((data->flags & FLAG_IO_THREAD_JOINED) == 0) {
287  data->ioThread.join();
288  data->flags |= FLAG_IO_THREAD_JOINED;
289  }
290 }
291 
293  // Move lockedData_ into a local member variable so it will be released
294  // when we return.
297 
298  if (!(data->flags & FLAG_IO_THREAD_STARTED)) {
299  // Do not start the I/O thread if the constructor has not finished yet
300  return;
301  }
302  if (data->flags & FLAG_DESTROYING) {
303  // Do not restart the I/O thread if we were being destroyed.
304  // If there are more pending messages that need to be flushed the
305  // destructor's stopIoThread() call will handle flushing the messages in
306  // this case.
307  return;
308  }
309 
311  data->ioThread = std::thread([this] { ioThread(); });
312 }
313 
314 } // namespace folly
std::vector< uint8_t > buffer(kBufferSize+16)
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
std::condition_variable messageReady_
flags
Definition: http_parser.h:127
std::string str() const
Definition: Range.h:591
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
void setMaxBufferSize(size_t size)
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
size_t getMaxBufferSize() const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
AsyncFileWriter(folly::StringPiece path)
ssize_t writevFull(int fd, iovec *iov, int count)
Definition: FileUtil.cpp:150
typename Base::LockedPtr LockedPtr
Definition: Synchronized.h:453
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
int fd() const
Definition: File.h:85
std::condition_variable ioCV_
std::string getNumDiscardedMsg(size_t numDiscarded)
static constexpr size_t kDefaultMaxBufferSize
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
ssize_t writeFull(int fd, const void *buf, size_t count)
Definition: FileUtil.cpp:134
bool ttyOutput() const override
folly::Synchronized< Data, std::mutex > data_
auto start
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
void onIoError(const std::exception &ex)
void performIO(std::vector< std::string > *ioQueue, size_t numDiscarded)
const char * string
Definition: Conv.cpp:212
static void unregisterHandler(void *object)
Definition: AtFork.cpp:118
Range< const char * > StringPiece
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
Definition: AtFork.cpp:108
void stopIoThread(folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)
static void internalWarning(folly::StringPiece file, int lineNumber, Args &&...args) noexcept
Definition: LoggerDB.h:201