2 * Copyright 2004-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/Synchronized.h>
24 #include <folly/experimental/TestUtil.h>
25 #include <folly/experimental/logging/AsyncFileWriter.h>
26 #include <folly/experimental/logging/Init.h>
27 #include <folly/experimental/logging/LoggerDB.h>
28 #include <folly/experimental/logging/xlog.h>
29 #include <folly/futures/Future.h>
30 #include <folly/futures/Promise.h>
31 #include <folly/init/Init.h>
32 #include <folly/lang/SafeAssert.h>
33 #include <folly/portability/GFlags.h>
34 #include <folly/portability/GMock.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Unistd.h>
38 DEFINE_string(logging, "", "folly::logging configuration");
40 async_discard_num_normal_writers,
42 "number of threads to use to generate normal log messages during "
43 "the AsyncFileWriter.discard test");
45 async_discard_num_nodiscard_writers,
47 "number of threads to use to generate non-discardable log messages during "
48 "the AsyncFileWriter.discard test");
50 async_discard_read_sleep_usec,
52 "how long the read thread should sleep between reads in "
53 "the AsyncFileWriter.discard test");
55 async_discard_timeout_msec,
57 "A timeout for the AsyncFileWriter.discard test if it cannot generate "
60 async_discard_num_events,
62 "The number of discard events to wait for in the AsyncFileWriter.discard "
65 using namespace folly;
66 using namespace std::literals::chrono_literals;
67 using folly::test::TemporaryFile;
68 using std::chrono::steady_clock;
69 using std::chrono::milliseconds;
71 TEST(AsyncFileWriter, noMessages) {
72 TemporaryFile tmpFile{"logging_test"};
74 // Test the simple construction and destruction of an AsyncFileWriter
75 // without ever writing any messages. This still exercises the I/O
76 // thread start-up and shutdown code.
77 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
80 TEST(AsyncFileWriter, simpleMessages) {
81 TemporaryFile tmpFile{"logging_test"};
84 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
85 for (int n = 0; n < 10; ++n) {
86 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
93 auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
96 std::string expected =
107 EXPECT_EQ(expected, data);
111 static std::vector<std::string>* internalWarnings;
113 void handleLoggingError(
114 StringPiece /* file */,
115 int /* lineNumber */,
117 internalWarnings->emplace_back(std::move(msg));
121 TEST(AsyncFileWriter, ioError) {
122 // Set the LoggerDB internal warning handler so we can record the messages
123 std::vector<std::string> logErrors;
124 internalWarnings = &logErrors;
125 LoggerDB::setInternalWarningHandler(handleLoggingError);
127 // Create an AsyncFileWriter that refers to a pipe whose read end is closed
128 std::array<int, 2> fds;
129 auto rc = pipe(fds.data());
130 folly::checkUnixError(rc, "failed to create pipe");
132 signal(SIGPIPE, SIG_IGN);
136 // Log a bunch of messages to the writer
137 size_t numMessages = 100;
139 AsyncFileWriter writer{folly::File{fds[1], true}};
140 for (size_t n = 0; n < numMessages; ++n) {
141 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
146 LoggerDB::setInternalWarningHandler(nullptr);
148 // AsyncFileWriter should have some internal warning messages about the
149 // log failures. This will generally be many fewer than the number of
150 // messages we wrote, though, since it performs write batching.
152 // GTest on Windows doesn't support alternation in the regex syntax -_-....
153 const std::string kExpectedErrorMessage =
155 // The `pipe` call above is actually implemented via sockets, so we get
156 // a different error message.
157 "An established connection was aborted by the software in your host machine\\.";
162 for (const auto& msg : logErrors) {
165 testing::ContainsRegex(
166 "error writing to log file .* in AsyncFileWriter.*: " +
167 kExpectedErrorMessage));
169 EXPECT_GT(logErrors.size(), 0);
170 EXPECT_LE(logErrors.size(), numMessages);
174 size_t fillUpPipe(int fd) {
175 int flags = fcntl(fd, F_GETFL);
176 folly::checkUnixError(flags, "failed get file descriptor flags");
177 auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
178 folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
179 std::vector<char> data;
181 size_t totalBytes = 0;
182 size_t bytesToWrite = data.size();
184 auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
185 if (bytesWritten < 0) {
186 if (errno == EAGAIN || errno == EWOULDBLOCK) {
187 // We blocked. Keep trying smaller writes, until we get down to a
188 // single byte, just to make sure the logging code really won't be able
189 // to write anything to the pipe.
190 if (bytesToWrite <= 1) {
196 throwSystemError("error writing to pipe");
199 totalBytes += bytesWritten;
202 XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
204 rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
205 folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
211 TEST(AsyncFileWriter, flush) {
212 // Set up a pipe(), then write data to the write endpoint until it fills up
213 // and starts blocking.
214 std::array<int, 2> fds;
215 auto rc = pipe(fds.data());
216 folly::checkUnixError(rc, "failed to create pipe");
217 File readPipe{fds[0], true};
218 File writePipe{fds[1], true};
220 auto paddingSize = fillUpPipe(writePipe.fd());
222 // Now set up an AsyncFileWriter pointing at the write end of the pipe
223 AsyncFileWriter writer{std::move(writePipe)};
226 writer.writeMessage("test message: " + std::string(200, 'x'));
228 // Call flush(). Use a separate thread, since this should block until we
229 // consume data from the pipe.
230 Promise<Unit> promise;
231 auto future = promise.getFuture();
232 auto flushFunction = [&] { writer.flush(); };
233 std::thread flushThread{
234 [&]() { promise.setTry(makeTryWith(flushFunction)); }};
235 // Detach the flush thread now rather than joining it at the end of the
236 // function. This way if something goes wrong during the test we will fail
237 // with the real error, rather than crashing due to the std::thread
238 // destructor running on a still-joinable thread.
239 flushThread.detach();
241 // Sleep briefly, and make sure flush() still hasn't completed.
242 // If it has completed this doesn't necessarily indicate a bug in
243 // AsyncFileWriter, but instead indicates that our test code failed to
244 // successfully cause a blocking write.
246 std::this_thread::sleep_for(10ms);
247 EXPECT_FALSE(future.isReady());
249 // Now read from the pipe
250 std::vector<char> buf;
251 buf.resize(paddingSize);
252 readFull(readPipe.fd(), buf.data(), buf.size());
254 // Make sure flush completes successfully now
258 // A large-ish message suffix, just to consume space and help fill up
259 // log buffers faster.
260 static constexpr StringPiece kMsgSuffix{
261 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
262 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
263 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
264 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
269 : deadline_{steady_clock::now() +
270 milliseconds{FLAGS_async_discard_timeout_msec}},
271 readSleepUS_{static_cast<uint64_t>(
272 std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
274 void clearSleepDuration() {
275 readSleepUS_.store(0);
277 std::chrono::microseconds getSleepUS() const {
278 return std::chrono::microseconds{readSleepUS_.load()};
281 bool shouldWriterStop() const {
282 // Stop after we have seen the required number of separate discard events.
283 // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
284 // ensures the async writer blocks and then makes progress again multiple
286 if (FLAGS_async_discard_num_events > 0 &&
287 discardEventsSeen_.load() >
288 static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
292 // Stop after a timeout, even if we don't hit the number of requested
294 return steady_clock::now() > deadline_;
296 void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
297 auto map = perThreadWriteData_.wlock();
299 map->find(threadID) == map->end(),
300 "multiple writer threads with same ID");
301 auto& data = (*map)[threadID];
302 data.numMessagesWritten = messagesWritten;
307 auto writeDataMap = perThreadWriteData_.wlock();
309 EXPECT_EQ("", trailingData_);
310 EXPECT_EQ(0, numUnableToParse_);
311 EXPECT_EQ(0, numOutOfOrder_);
313 // Check messages received from each writer thread
314 size_t readerStatsChecked = 0;
315 size_t totalMessagesWritten = 0;
316 size_t totalMessagesRead = 0;
317 for (const auto& writeEntry : *writeDataMap) {
318 const auto& writeInfo = writeEntry.second;
319 totalMessagesWritten += writeInfo.numMessagesWritten;
321 auto iter = perThreadReadData_.find(writeEntry.first);
322 if (iter == perThreadReadData_.end()) {
323 // We never received any messages from this writer thread.
324 // This is okay as long as this is not a NEVER_DISCARD writer.
325 EXPECT_EQ(0, writeInfo.flags);
328 const auto& readInfo = iter->second;
329 ++readerStatsChecked;
330 totalMessagesRead += readInfo.numMessagesRead;
331 if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
332 // Non-discarding threads should never discard anything
333 EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
334 EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
336 // Other threads may have discarded some messages
337 EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
338 EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
342 EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
343 EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
345 // This test is intended to check the discard behavior.
346 // Fail the test if we didn't actually trigger any discards before we timed
348 EXPECT_GT(numDiscarded_, 0);
350 XLOG(DBG1) << totalMessagesWritten << " messages written, "
351 << totalMessagesRead << " messages read, " << numDiscarded_
352 << " messages discarded";
355 void messageReceived(StringPiece msg) {
356 if (msg.endsWith(" log messages discarded: "
357 "logging faster than we can write")) {
358 auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
359 XLOG(DBG3, "received discard notification: ", discardCount);
360 numDiscarded_ += discardCount;
361 ++discardEventsSeen_;
366 size_t messageIndex = 0;
368 parseMessage(msg, &threadID, &messageIndex);
369 } catch (const std::exception& ex) {
371 XLOG(ERR, "unable to parse log message: ", msg);
375 auto& data = perThreadReadData_[threadID];
376 data.numMessagesRead++;
377 if (messageIndex > data.lastId) {
378 data.lastId = messageIndex;
381 XLOG(ERR) << "received out-of-order messages from writer " << threadID
382 << ": " << messageIndex << " received after " << data.lastId;
386 void trailingData(StringPiece data) {
387 trailingData_ = data.str();
392 size_t numMessagesRead{0};
396 size_t numMessagesWritten{0};
400 void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
401 // Validate and strip off the message prefix and suffix
402 constexpr StringPiece prefix{"thread "};
403 if (!msg.startsWith(prefix)) {
404 throw std::runtime_error("bad message prefix");
406 msg.advance(prefix.size());
407 if (!msg.endsWith(kMsgSuffix)) {
408 throw std::runtime_error("bad message suffix");
410 msg.subtract(kMsgSuffix.size());
412 // Parse then strip off the thread index
413 auto threadIDEnd = msg.find(' ');
414 if (threadIDEnd == StringPiece::npos) {
415 throw std::runtime_error("no middle found");
417 *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
418 msg.advance(threadIDEnd);
420 // Validate that the middle of the message is what we expect,
422 constexpr StringPiece middle{" message "};
423 if (!msg.startsWith(middle)) {
424 throw std::runtime_error("bad message middle");
426 msg.advance(middle.size());
428 // Parse the message index
429 *messageIndex = folly::to<size_t>(msg);
433 * Data about each writer thread, as recorded by the reader thread.
435 * At the end of the test we will compare perThreadReadData_ (recorded by the
436 * reader) with perThreadWriteData_ (recorded by the writers) to make sure
437 * the data matches up.
439 * This is a map from writer_thread_id to ReaderData.
440 * The writer_thread_id is extracted from the received messages.
442 * This field does not need locking as it is only updated by the single
445 std::unordered_map<size_t, ReaderData> perThreadReadData_;
448 * Additional information recorded by the reader thread.
450 std::string trailingData_;
451 size_t numUnableToParse_{0};
452 size_t numOutOfOrder_{0};
453 size_t numDiscarded_{0};
456 * deadline_ is a maximum end time for the test.
458 * The writer threads quit if the deadline is reached even if they have not
459 * produced the desired number of discard events yet.
461 const std::chrono::steady_clock::time_point deadline_;
464 * How long the reader thread should sleep between each read event.
466 * This is initially set to a non-zero value (read from the
467 * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
468 * slowly, which will fill up the pipe buffer and cause discard events.
470 * Once we have produce enough discards and are ready to finish the test the
471 * main thread reduces readSleepUS_ to 0, so the reader will finish the
472 * remaining message backlog quickly.
474 std::atomic<uint64_t> readSleepUS_{0};
477 * A count of how many discard events have been seen so far.
479 * The reader increments discardEventsSeen_ each time it sees a discard
480 * notification message. A "discard event" basically corresponds to a single
481 * group of dropped messages. Once the reader pulls some messages off out of
482 * the pipe the writers should be able to send more data, but the buffer will
483 * eventually fill up again, producing another discard event.
485 std::atomic<uint64_t> discardEventsSeen_{0};
488 * Data about each writer thread, as recorded by the writers.
490 * When each writer thread finishes it records how many messages it wrote,
491 * plus the flags it used to write the messages.
493 folly::Synchronized<std::unordered_map<size_t, WriterData>>
498 * readThread() reads messages slowly from a pipe. This helps test the
499 * AsyncFileWriter behavior when I/O is slow.
501 void readThread(folly::File&& file, ReadStats* stats) {
502 std::vector<char> buffer;
505 size_t bufferIdx = 0;
508 std::this_thread::sleep_for(stats->getSleepUS());
510 auto readResult = folly::readNoInt(
511 file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
512 if (readResult < 0) {
513 XLOG(ERR, "error reading from pipe: ", errno);
516 if (readResult == 0) {
517 XLOG(DBG2, "read EOF");
521 auto logDataLen = bufferIdx + readResult;
522 StringPiece logData{buffer.data(), logDataLen};
525 auto end = logData.find('\n', idx);
526 if (end == StringPiece::npos) {
527 bufferIdx = logDataLen - idx;
528 memmove(buffer.data(), buffer.data() + idx, bufferIdx);
532 StringPiece logMsg{logData.data() + idx, end - idx};
533 stats->messageReceived(logMsg);
538 if (bufferIdx != 0) {
539 stats->trailingData(StringPiece{buffer.data(), bufferIdx});
544 * writeThread() writes a series of messages to the AsyncFileWriter
547 AsyncFileWriter* writer,
550 ReadStats* readStats) {
554 writer->writeMessage(
555 folly::to<std::string>(
556 "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
559 // Break out once the reader has seen enough discards
560 if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
561 readStats->writerFinished(id, msgID, flags);
568 * The discard test spawns a number of threads that each write a large number
569 * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
570 * thread reads from it slowly, causing a backlog to build up.
572 * The test then checks that:
573 * - The read thread always receives full messages (no partial log messages)
574 * - Messages that are received are received in order
575 * - The number of messages received plus the number reported in discard
576 * notifications matches the number of messages sent.
578 TEST(AsyncFileWriter, discard) {
579 std::array<int, 2> fds;
580 auto pipeResult = pipe(fds.data());
581 folly::checkUnixError(pipeResult, "pipe failed");
582 folly::File readPipe{fds[0], true};
583 folly::File writePipe{fds[1], true};
586 std::thread reader(readThread, std::move(readPipe), &readStats);
588 AsyncFileWriter writer{std::move(writePipe)};
590 std::vector<std::thread> writeThreads;
591 size_t numThreads = FLAGS_async_discard_num_normal_writers +
592 FLAGS_async_discard_num_nodiscard_writers;
594 for (size_t n = 0; n < numThreads; ++n) {
596 if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
597 flags = LogWriter::NEVER_DISCARD;
599 XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
601 writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
604 for (auto& t : writeThreads) {
607 XLOG(DBG2, "writers done");
609 // Clear the read sleep duration so the reader will finish quickly now
610 readStats.clearSleepDuration();
615 int main(int argc, char* argv[]) {
616 testing::InitGoogleTest(&argc, argv);
617 folly::init(&argc, &argv);
618 // Don't use async logging in the async logging tests :-)
619 folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false);
621 return RUN_ALL_TESTS();