proxygen
AsyncFileWriter.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <thread>
21 
22 #include <folly/File.h>
23 #include <folly/Range.h>
24 #include <folly/Synchronized.h>
26 
27 namespace folly {
28 
44 class AsyncFileWriter : public LogWriter {
45  public:
51  static constexpr size_t kDefaultMaxBufferSize = 1024 * 1024;
52 
57  explicit AsyncFileWriter(folly::StringPiece path);
58 
62  explicit AsyncFileWriter(folly::File&& file);
63 
65 
67  void writeMessage(std::string&& buffer, uint32_t flags = 0) override;
68 
73  void flush() override;
74 
78  bool ttyOutput() const override;
79 
88  void setMaxBufferSize(size_t size);
89 
93  size_t getMaxBufferSize() const;
94 
98  const folly::File& getFile() const {
99  return file_;
100  }
101 
102  private:
103  enum Flags : uint32_t {
104  // FLAG_IO_THREAD_STARTED indicates that the constructor has started the
105  // I/O thread.
107  // FLAG_DESTROYING indicates that the destructor is running and destroying
108  // the I/O thread.
110  // FLAG_STOP indicates that the I/O thread has been asked to stop.
111  // This is set either by the destructor or by preFork()
112  FLAG_STOP = 0x04,
113  // FLAG_IO_THREAD_STOPPED indicates that the I/O thread is about to return
114  // and can now be joined. ioCV_ will be signalled when this flag is set.
116  // FLAG_IO_THREAD_JOINED indicates that the I/O thread has been joined.
118  };
119 
120  /*
121  * A simple implementation using two queues.
122  * All writer threads enqueue into one queue while the I/O thread is
123  * processing the other.
124  *
125  * We could potentially also provide an implementation using folly::MPMCQueue
126  * in the future, which may improve contention under very high write loads.
127  */
128  struct Data {
129  std::array<std::vector<std::string>, 2> queues;
132  size_t maxBufferBytes{kDefaultMaxBufferSize};
133  size_t currentBufferSize{0};
134  size_t numDiscarded{0};
135  std::thread ioThread;
136 
137  std::vector<std::string>* getCurrentQueue() {
138  return &queues[ioThreadCounter & 0x1];
139  }
140  };
141 
142  void ioThread();
143  void performIO(std::vector<std::string>* ioQueue, size_t numDiscarded);
144 
145  void onIoError(const std::exception& ex);
146  std::string getNumDiscardedMsg(size_t numDiscarded);
147 
148  bool preFork();
149  void postForkParent();
150  void postForkChild();
151  void stopIoThread(
153  uint32_t extraFlags);
154  void restartThread();
155 
157 
163  std::condition_variable messageReady_;
168  std::condition_variable ioCV_;
169 
177 };
178 } // namespace folly
std::vector< uint8_t > buffer(kBufferSize+16)
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
std::condition_variable messageReady_
flags
Definition: http_parser.h:127
const folly::File & getFile() const
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
void setMaxBufferSize(size_t size)
std::vector< std::string > * getCurrentQueue()
size_t getMaxBufferSize() const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
AsyncFileWriter(folly::StringPiece path)
typename Base::LockedPtr LockedPtr
Definition: Synchronized.h:453
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
std::condition_variable ioCV_
std::string getNumDiscardedMsg(size_t numDiscarded)
static constexpr size_t kDefaultMaxBufferSize
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
bool ttyOutput() const override
folly::Synchronized< Data, std::mutex > data_
void onIoError(const std::exception &ex)
void performIO(std::vector< std::string > *ioQueue, size_t numDiscarded)
const char * string
Definition: Conv.cpp:212
std::array< std::vector< std::string >, 2 > queues
void stopIoThread(folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)