Adds writer test case for RCU
[folly.git] / folly / experimental / logging / AsyncFileWriter.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/logging/AsyncFileWriter.h>
17
18 #include <folly/Exception.h>
19 #include <folly/FileUtil.h>
20 #include <folly/experimental/logging/LoggerDB.h>
21 #include <folly/system/ThreadName.h>
22
23 using folly::File;
24 using folly::StringPiece;
25
26 namespace folly {
27
28 constexpr size_t AsyncFileWriter::kDefaultMaxBufferSize;
29
30 AsyncFileWriter::AsyncFileWriter(StringPiece path)
31     : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {}
32
33 AsyncFileWriter::AsyncFileWriter(folly::File&& file)
34     : file_{std::move(file)}, ioThread_([this] { ioThread(); }) {}
35
36 AsyncFileWriter::~AsyncFileWriter() {
37   data_->stop = true;
38   messageReady_.notify_one();
39   ioThread_.join();
40 }
41
42 void AsyncFileWriter::writeMessage(StringPiece buffer, uint32_t flags) {
43   return writeMessage(buffer.str(), flags);
44 }
45
46 void AsyncFileWriter::writeMessage(std::string&& buffer, uint32_t flags) {
47   auto data = data_.lock();
48   if ((data->currentBufferSize >= data->maxBufferBytes) &&
49       !(flags & NEVER_DISCARD)) {
50     ++data->numDiscarded;
51     return;
52   }
53
54   data->currentBufferSize += buffer.size();
55   auto* queue = data->getCurrentQueue();
56   queue->emplace_back(std::move(buffer));
57   messageReady_.notify_one();
58 }
59
60 void AsyncFileWriter::flush() {
61   auto data = data_.lock();
62   auto start = data->ioThreadCounter;
63
64   // Wait until ioThreadCounter increments by at least two.
65   // Waiting for a single increment is not sufficient, as this happens after
66   // the I/O thread has swapped the queues, which is before it has actually
67   // done the I/O.
68   while (data->ioThreadCounter < start + 2) {
69     if (data->ioThreadDone) {
70       return;
71     }
72
73     // Enqueue an empty string and wake the I/O thread.
74     // The empty string ensures that the I/O thread will break out of its wait
75     // loop and increment the ioThreadCounter, even if there is no other work
76     // to do.
77     data->getCurrentQueue()->emplace_back();
78     messageReady_.notify_one();
79
80     // Wait for notification from the I/O thread that it has done work.
81     ioCV_.wait(data.getUniqueLock());
82   }
83 }
84
85 void AsyncFileWriter::setMaxBufferSize(size_t size) {
86   auto data = data_.lock();
87   data->maxBufferBytes = size;
88 }
89
90 size_t AsyncFileWriter::getMaxBufferSize() const {
91   auto data = data_.lock();
92   return data->maxBufferBytes;
93 }
94
95 void AsyncFileWriter::ioThread() {
96   folly::setThreadName("log_writer");
97
98   while (true) {
99     // With the lock held, grab a pointer to the current queue, then increment
100     // the ioThreadCounter index so that other threads will write into the
101     // other queue as we process this one.
102     std::vector<std::string>* ioQueue;
103     size_t numDiscarded;
104     bool stop;
105     {
106       auto data = data_.lock();
107       ioQueue = data->getCurrentQueue();
108       while (ioQueue->empty() && !data->stop) {
109         messageReady_.wait(data.getUniqueLock());
110       }
111
112       ++data->ioThreadCounter;
113       numDiscarded = data->numDiscarded;
114       data->numDiscarded = 0;
115       data->currentBufferSize = 0;
116       stop = data->stop;
117     }
118     ioCV_.notify_all();
119
120     // Write the log messages now that we have released the lock
121     try {
122       performIO(ioQueue);
123     } catch (const std::exception& ex) {
124       onIoError(ex);
125     }
126
127     // clear() empties the vector, but the allocated capacity remains so we can
128     // just reuse it without having to re-allocate in most cases.
129     ioQueue->clear();
130
131     if (numDiscarded > 0) {
132       auto msg = getNumDiscardedMsg(numDiscarded);
133       if (!msg.empty()) {
134         auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size());
135         // We currently ignore errors from writeFull() here.
136         // There's not much we can really do.
137         (void)ret;
138       }
139     }
140
141     if (stop) {
142       data_->ioThreadDone = true;
143       break;
144     }
145   }
146 }
147
148 void AsyncFileWriter::performIO(std::vector<std::string>* ioQueue) {
149   // kNumIovecs controls the maximum number of strings we write at once in a
150   // single writev() call.
151   constexpr int kNumIovecs = 64;
152   std::array<iovec, kNumIovecs> iovecs;
153
154   size_t idx = 0;
155   while (idx < ioQueue->size()) {
156     int numIovecs = 0;
157     while (numIovecs < kNumIovecs && idx < ioQueue->size()) {
158       const auto& str = (*ioQueue)[idx];
159       iovecs[numIovecs].iov_base = const_cast<char*>(str.data());
160       iovecs[numIovecs].iov_len = str.size();
161       ++numIovecs;
162       ++idx;
163     }
164
165     auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs);
166     folly::checkUnixError(ret, "writeFull() failed");
167   }
168 }
169
170 void AsyncFileWriter::onIoError(const std::exception& ex) {
171   LoggerDB::internalWarning(
172       __FILE__,
173       __LINE__,
174       "error writing to log file ",
175       file_.fd(),
176       " in AsyncFileWriter: ",
177       folly::exceptionStr(ex));
178 }
179
180 std::string AsyncFileWriter::getNumDiscardedMsg(size_t numDiscarded) {
181   // We may want to make this customizable in the future (e.g., to allow it to
182   // conform to the LogFormatter style being used).
183   // For now just return a simple fixed message.
184   return folly::to<std::string>(
185       numDiscarded,
186       " log messages discarded: logging faster than we can write\n");
187 }
188 } // namespace folly