52 std::vector<std::string>* ioQueue;
62 ioQueue =
data->getCurrentQueue();
63 numDiscarded =
data->numDiscarded;
72 if (!ioQueue->empty()) {
75 }
catch (
const std::exception& ex) {
91 if ((
data->currentBufferSize >=
data->maxBufferBytes) &&
98 auto* queue =
data->getCurrentQueue();
111 while (
data->ioThreadCounter <
start + 2) {
116 data->getCurrentQueue()->emplace_back();
131 return data->maxBufferBytes;
141 std::vector<std::string>* ioQueue;
145 ioQueue =
data->getCurrentQueue();
164 ++
data->ioThreadCounter;
165 numDiscarded =
data->numDiscarded;
166 data->numDiscarded = 0;
167 data->currentBufferSize = 0;
174 }
catch (
const std::exception& ex) {
185 std::vector<std::string>* ioQueue,
186 size_t numDiscarded) {
189 constexpr
int kNumIovecs = 64;
190 std::array<iovec, kNumIovecs> iovecs;
193 while (idx < ioQueue->
size()) {
195 while (numIovecs < kNumIovecs && idx < ioQueue->
size()) {
196 const auto& str = (*ioQueue)[idx];
197 iovecs[numIovecs].iov_base =
const_cast<char*
>(str.data());
198 iovecs[numIovecs].iov_len = str.size();
207 if (numDiscarded > 0) {
222 "error writing to log file ",
224 " in AsyncFileWriter: ",
232 return folly::to<std::string>(
234 " log messages discarded: logging faster than we can write\n");
279 ioCV_.wait(data.getUniqueLock(), [&] {
287 data->ioThread.join();
311 data->ioThread = std::thread([
this] {
ioThread(); });
std::vector< uint8_t > buffer(kBufferSize+16)
void writeMessage(folly::StringPiece buffer, uint32_t flags=0) override
std::condition_variable messageReady_
folly::Synchronized< Data, std::mutex >::LockedPtr lockedData_
void setMaxBufferSize(size_t size)
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
size_t getMaxBufferSize() const
—— Concurrent Priority Queue Implementation ——
AsyncFileWriter(folly::StringPiece path)
ssize_t writevFull(int fd, iovec *iov, int count)
typename Base::LockedPtr LockedPtr
constexpr auto size(C const &c) -> decltype(c.size())
std::condition_variable ioCV_
std::string getNumDiscardedMsg(size_t numDiscarded)
static constexpr size_t kDefaultMaxBufferSize
constexpr auto data(C &c) -> decltype(c.data())
ssize_t writeFull(int fd, const void *buf, size_t count)
bool ttyOutput() const override
folly::Synchronized< Data, std::mutex > data_
void checkUnixError(ssize_t ret, Args &&...args)
bool setThreadName(std::thread::id tid, StringPiece name)
void onIoError(const std::exception &ex)
void performIO(std::vector< std::string > *ioQueue, size_t numDiscarded)
static void unregisterHandler(void *object)
Range< const char * > StringPiece
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
void stopIoThread(folly::Synchronized< Data, std::mutex >::LockedPtr &data, uint32_t extraFlags)
static void internalWarning(folly::StringPiece file, int lineNumber, Args &&...args) noexcept