2 * Copyright 2004-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/Conv.h>
17 #include <folly/Exception.h>
18 #include <folly/File.h>
19 #include <folly/FileUtil.h>
20 #include <folly/String.h>
21 #include <folly/experimental/TestUtil.h>
22 #include <folly/experimental/logging/AsyncFileWriter.h>
23 #include <folly/experimental/logging/LoggerDB.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Unistd.h>
30 async_discard_num_writer_threads,
32 "number of threads to use to generate log messages during "
33 "the AsyncFileWriter.discard test");
35 async_discard_messages_per_writer,
37 "number of messages each writer threads should generate in "
38 "the AsyncFileWriter.discard test");
40 async_discard_read_sleep_usec,
42 "how long the read thread should sleep between reads in "
43 "the AsyncFileWriter.discard test");
45 using namespace folly;
46 using folly::test::TemporaryFile;
48 TEST(AsyncFileWriter, noMessages) {
49 TemporaryFile tmpFile{"logging_test"};
51 // Test the simple construction and destruction of an AsyncFileWriter
52 // without ever writing any messages. This still exercises the I/O
53 // thread start-up and shutdown code.
54 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
57 TEST(AsyncFileWriter, simpleMessages) {
58 TemporaryFile tmpFile{"logging_test"};
61 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
62 for (int n = 0; n < 10; ++n) {
63 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
69 auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
72 std::string expected =
83 EXPECT_EQ(expected, data);
88 static std::vector<std::string>* internalWarnings;
90 void handleLoggingError(
91 StringPiece /* file */,
94 internalWarnings->emplace_back(std::move(msg));
98 TEST(AsyncFileWriter, ioError) {
99 // Set the LoggerDB internal warning handler so we can record the messages
100 std::vector<std::string> logErrors;
101 internalWarnings = &logErrors;
102 LoggerDB::setInternalWarningHandler(handleLoggingError);
104 // Create an AsyncFileWriter that refers to a pipe whose read end is closed
105 std::array<int, 2> fds;
106 auto rc = pipe(fds.data());
107 folly::checkUnixError(rc, "failed to create pipe");
108 signal(SIGPIPE, SIG_IGN);
111 // Log a bunch of messages to the writer
112 size_t numMessages = 100;
114 AsyncFileWriter writer{folly::File{fds[1], true}};
115 for (size_t n = 0; n < numMessages; ++n) {
116 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
121 LoggerDB::setInternalWarningHandler(nullptr);
123 // AsyncFileWriter should have some internal warning messages about the
124 // log failures. This will generally be many fewer than the number of
125 // messages we wrote, though, since it performs write batching.
126 for (const auto& msg : logErrors) {
129 testing::ContainsRegex(
130 "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
132 EXPECT_GT(logErrors.size(), 0);
133 EXPECT_LE(logErrors.size(), numMessages);
138 * writeThread() writes a series of messages to the AsyncFileWriter
140 void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
141 for (size_t n = 0; n < numMessages; ++n) {
142 writer->writeMessage(
143 folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
149 void check(size_t numThreads, size_t messagesPerThread) {
150 EXPECT_EQ("", trailingData_);
151 EXPECT_EQ(numThreads, writers_.size());
152 size_t totalMessagesReceived = 0;
153 for (const auto& writerData : writers_) {
154 EXPECT_LE(writerData.numMessages, messagesPerThread);
155 EXPECT_LE(writerData.lastId, messagesPerThread);
156 totalMessagesReceived += writerData.numMessages;
159 EXPECT_EQ(0, numUnableToParse_);
160 EXPECT_EQ(0, numOutOfOrder_);
162 numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
165 void messageReceived(StringPiece msg) {
166 if (msg.endsWith(" log messages discarded: "
167 "logging faster than we can write")) {
168 auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
169 fprintf(stderr, "received discard notification: %zu\n", discardCount);
170 numDiscarded_ += discardCount;
175 size_t messageIndex = 0;
177 parseMessage(msg, &threadID, &messageIndex);
178 } catch (const std::exception& ex) {
182 "unable to parse log message: %s\n",
183 folly::humanify(msg.str()).c_str());
187 if (threadID >= writers_.size()) {
188 writers_.resize(threadID + 1);
190 writers_[threadID].numMessages++;
191 if (messageIndex > writers_[threadID].lastId) {
192 writers_[threadID].lastId = messageIndex;
197 "received out-of-order messages from writer %zu: "
198 "%zu received after %zu\n",
201 writers_[threadID].lastId);
205 void trailingData(StringPiece data) {
206 trailingData_ = data.str();
211 size_t numMessages{0};
215 void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
216 constexpr StringPiece prefix{"thread "};
217 constexpr StringPiece middle{" message "};
218 if (!msg.startsWith(prefix)) {
219 throw std::runtime_error("bad message prefix");
222 auto idx = prefix.size();
223 auto end = msg.find(' ', idx);
224 if (end == StringPiece::npos) {
225 throw std::runtime_error("no middle found");
228 *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
229 auto rest = msg.subpiece(end);
230 if (!rest.startsWith(middle)) {
231 throw std::runtime_error("bad message middle");
234 rest.advance(middle.size());
235 *messageIndex = folly::to<size_t>(rest);
238 std::vector<WriterStats> writers_;
239 std::string trailingData_;
240 size_t numUnableToParse_{0};
241 size_t numOutOfOrder_{0};
242 size_t numDiscarded_{0};
246 * readThread() reads messages slowly from a pipe. This helps test the
247 * AsyncFileWriter behavior when I/O is slow.
249 void readThread(folly::File&& file, ReadStats* stats) {
250 std::vector<char> buffer;
253 size_t bufferIdx = 0;
256 usleep(FLAGS_async_discard_read_sleep_usec);
258 auto readResult = folly::readNoInt(
259 file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
260 if (readResult < 0) {
261 fprintf(stderr, "error reading from pipe: %d\n", errno);
264 if (readResult == 0) {
265 fprintf(stderr, "read EOF\n");
269 auto logDataLen = bufferIdx + readResult;
270 StringPiece logData{buffer.data(), logDataLen};
273 auto end = logData.find('\n', idx);
274 if (end == StringPiece::npos) {
275 bufferIdx = logDataLen - idx;
276 memmove(buffer.data(), buffer.data() + idx, bufferIdx);
280 StringPiece logMsg{logData.data() + idx, end - idx};
281 stats->messageReceived(logMsg);
286 if (bufferIdx != 0) {
287 stats->trailingData(StringPiece{buffer.data(), bufferIdx});
292 * The discard test spawns a number of threads that each write a large number
293 * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
294 * thread reads from it slowly, causing a backlog to build up.
296 * The test then checks that:
297 * - The read thread always receives full messages (no partial log messages)
298 * - Messages that are received are received in order
299 * - The number of messages received plus the number reported in discard
300 * notifications matches the number of messages sent.
302 TEST(AsyncFileWriter, discard) {
303 std::array<int, 2> fds;
304 auto pipeResult = pipe(fds.data());
305 folly::checkUnixError(pipeResult, "pipe failed");
306 folly::File readPipe{fds[0], true};
307 folly::File writePipe{fds[1], true};
310 std::thread reader(readThread, std::move(readPipe), &readStats);
312 AsyncFileWriter writer{std::move(writePipe)};
314 std::vector<std::thread> writeThreads;
315 for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
316 writeThreads.emplace_back(
317 writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
320 for (auto& t : writeThreads) {
323 fprintf(stderr, "writers done\n");
327 FLAGS_async_discard_num_writer_threads,
328 FLAGS_async_discard_messages_per_writer);