2 * Copyright 2004-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/experimental/logging/AsyncFileWriter.h>
18 #include <folly/Exception.h>
19 #include <folly/FileUtil.h>
20 #include <folly/experimental/logging/LoggerDB.h>
21 #include <folly/system/ThreadName.h>
24 using folly::StringPiece;
28 constexpr size_t AsyncFileWriter::kDefaultMaxBufferSize;
30 AsyncFileWriter::AsyncFileWriter(StringPiece path)
31 : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {}
33 AsyncFileWriter::AsyncFileWriter(folly::File&& file)
34 : file_{std::move(file)}, ioThread_([this] { ioThread(); }) {}
36 AsyncFileWriter::~AsyncFileWriter() {
38 messageReady_.notify_one();
42 void AsyncFileWriter::writeMessage(StringPiece buffer, uint32_t flags) {
43 return writeMessage(buffer.str(), flags);
46 void AsyncFileWriter::writeMessage(std::string&& buffer, uint32_t flags) {
47 auto data = data_.lock();
48 if ((data->currentBufferSize >= data->maxBufferBytes) &&
49 !(flags & NEVER_DISCARD)) {
54 data->currentBufferSize += buffer.size();
55 auto* queue = data->getCurrentQueue();
56 queue->emplace_back(std::move(buffer));
57 messageReady_.notify_one();
60 void AsyncFileWriter::flush() {
61 auto data = data_.lock();
62 auto start = data->ioThreadCounter;
64 // Wait until ioThreadCounter increments by at least two.
65 // Waiting for a single increment is not sufficient, as this happens after
66 // the I/O thread has swapped the queues, which is before it has actually
68 while (data->ioThreadCounter < start + 2) {
69 if (data->ioThreadDone) {
73 // Enqueue an empty string and wake the I/O thread.
74 // The empty string ensures that the I/O thread will break out of its wait
75 // loop and increment the ioThreadCounter, even if there is no other work
77 data->getCurrentQueue()->emplace_back();
78 messageReady_.notify_one();
80 // Wait for notification from the I/O thread that it has done work.
81 ioCV_.wait(data.getUniqueLock());
85 void AsyncFileWriter::setMaxBufferSize(size_t size) {
86 auto data = data_.lock();
87 data->maxBufferBytes = size;
90 size_t AsyncFileWriter::getMaxBufferSize() const {
91 auto data = data_.lock();
92 return data->maxBufferBytes;
95 void AsyncFileWriter::ioThread() {
96 folly::setThreadName("log_writer");
99 // With the lock held, grab a pointer to the current queue, then increment
100 // the ioThreadCounter index so that other threads will write into the
101 // other queue as we process this one.
102 std::vector<std::string>* ioQueue;
106 auto data = data_.lock();
107 ioQueue = data->getCurrentQueue();
108 while (ioQueue->empty() && !data->stop) {
109 messageReady_.wait(data.getUniqueLock());
112 ++data->ioThreadCounter;
113 numDiscarded = data->numDiscarded;
114 data->numDiscarded = 0;
115 data->currentBufferSize = 0;
120 // Write the log messages now that we have released the lock
123 } catch (const std::exception& ex) {
127 // clear() empties the vector, but the allocated capacity remains so we can
128 // just reuse it without having to re-allocate in most cases.
131 if (numDiscarded > 0) {
132 auto msg = getNumDiscardedMsg(numDiscarded);
134 auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size());
135 // We currently ignore errors from writeFull() here.
136 // There's not much we can really do.
142 data_->ioThreadDone = true;
148 void AsyncFileWriter::performIO(std::vector<std::string>* ioQueue) {
149 // kNumIovecs controls the maximum number of strings we write at once in a
150 // single writev() call.
151 constexpr int kNumIovecs = 64;
152 std::array<iovec, kNumIovecs> iovecs;
155 while (idx < ioQueue->size()) {
157 while (numIovecs < kNumIovecs && idx < ioQueue->size()) {
158 const auto& str = (*ioQueue)[idx];
159 iovecs[numIovecs].iov_base = const_cast<char*>(str.data());
160 iovecs[numIovecs].iov_len = str.size();
165 auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs);
166 folly::checkUnixError(ret, "writeFull() failed");
170 void AsyncFileWriter::onIoError(const std::exception& ex) {
171 LoggerDB::internalWarning(
174 "error writing to log file ",
176 " in AsyncFileWriter: ",
177 folly::exceptionStr(ex));
180 std::string AsyncFileWriter::getNumDiscardedMsg(size_t numDiscarded) {
181 // We may want to make this customizable in the future (e.g., to allow it to
182 // conform to the LogFormatter style being used).
183 // For now just return a simple fixed message.
184 return folly::to<std::string>(
186 " log messages discarded: logging faster than we can write\n");