2 * Copyright 2004-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/experimental/TestUtil.h>
24 #include <folly/experimental/logging/AsyncFileWriter.h>
25 #include <folly/experimental/logging/LoggerDB.h>
26 #include <folly/futures/Future.h>
27 #include <folly/futures/Promise.h>
28 #include <folly/portability/GFlags.h>
29 #include <folly/portability/GMock.h>
30 #include <folly/portability/GTest.h>
31 #include <folly/portability/Unistd.h>
34 async_discard_num_writer_threads,
36 "number of threads to use to generate log messages during "
37 "the AsyncFileWriter.discard test");
39 async_discard_messages_per_writer,
41 "number of messages each writer threads should generate in "
42 "the AsyncFileWriter.discard test");
44 async_discard_read_sleep_usec,
46 "how long the read thread should sleep between reads in "
47 "the AsyncFileWriter.discard test");
49 using namespace folly;
50 using namespace std::literals::chrono_literals;
51 using folly::test::TemporaryFile;
53 TEST(AsyncFileWriter, noMessages) {
54 TemporaryFile tmpFile{"logging_test"};
56 // Test the simple construction and destruction of an AsyncFileWriter
57 // without ever writing any messages. This still exercises the I/O
58 // thread start-up and shutdown code.
59 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
62 TEST(AsyncFileWriter, simpleMessages) {
63 TemporaryFile tmpFile{"logging_test"};
66 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
67 for (int n = 0; n < 10; ++n) {
68 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
74 auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
77 std::string expected =
88 EXPECT_EQ(expected, data);
93 static std::vector<std::string>* internalWarnings;
95 void handleLoggingError(
96 StringPiece /* file */,
99 internalWarnings->emplace_back(std::move(msg));
103 TEST(AsyncFileWriter, ioError) {
104 // Set the LoggerDB internal warning handler so we can record the messages
105 std::vector<std::string> logErrors;
106 internalWarnings = &logErrors;
107 LoggerDB::setInternalWarningHandler(handleLoggingError);
109 // Create an AsyncFileWriter that refers to a pipe whose read end is closed
110 std::array<int, 2> fds;
111 auto rc = pipe(fds.data());
112 folly::checkUnixError(rc, "failed to create pipe");
113 signal(SIGPIPE, SIG_IGN);
116 // Log a bunch of messages to the writer
117 size_t numMessages = 100;
119 AsyncFileWriter writer{folly::File{fds[1], true}};
120 for (size_t n = 0; n < numMessages; ++n) {
121 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
126 LoggerDB::setInternalWarningHandler(nullptr);
128 // AsyncFileWriter should have some internal warning messages about the
129 // log failures. This will generally be many fewer than the number of
130 // messages we wrote, though, since it performs write batching.
131 for (const auto& msg : logErrors) {
134 testing::ContainsRegex(
135 "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
137 EXPECT_GT(logErrors.size(), 0);
138 EXPECT_LE(logErrors.size(), numMessages);
142 size_t fillUpPipe(int fd) {
143 int flags = fcntl(fd, F_GETFL);
144 folly::checkUnixError(flags, "failed get file descriptor flags");
145 auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
146 folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
147 std::vector<char> data;
149 size_t totalBytes = 0;
150 size_t bytesToWrite = data.size();
152 auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
153 if (bytesWritten < 0) {
154 if (errno == EAGAIN || errno == EWOULDBLOCK) {
155 // We blocked. Keep trying smaller writes, until we get down to a
156 // single byte, just to make sure the logging code really won't be able
157 // to write anything to the pipe.
158 if (bytesToWrite <= 1) {
164 throwSystemError("error writing to pipe");
167 totalBytes += bytesWritten;
170 fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
172 rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
173 folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
179 TEST(AsyncFileWriter, flush) {
180 // Set up a pipe(), then write data to the write endpoint until it fills up
181 // and starts blocking.
182 std::array<int, 2> fds;
183 auto rc = pipe(fds.data());
184 folly::checkUnixError(rc, "failed to create pipe");
185 File readPipe{fds[0], true};
186 File writePipe{fds[1], true};
188 auto paddingSize = fillUpPipe(writePipe.fd());
190 // Now set up an AsyncFileWriter pointing at the write end of the pipe
191 AsyncFileWriter writer{std::move(writePipe)};
194 writer.writeMessage(std::string{"test message"});
196 // Call flush(). Use a separate thread, since this should block until we
197 // consume data from the pipe.
198 Promise<Unit> promise;
199 auto future = promise.getFuture();
200 auto flushFunction = [&] { writer.flush(); };
201 std::thread flushThread{
202 [&]() { promise.setTry(makeTryWith(flushFunction)); }};
204 // Sleep briefly, and make sure flush() still hasn't completed.
206 std::this_thread::sleep_for(10ms);
207 EXPECT_FALSE(future.isReady());
209 // Now read from the pipe
210 std::vector<char> buf;
211 buf.resize(paddingSize);
212 readFull(readPipe.fd(), buf.data(), buf.size());
214 // Make sure flush completes successfully now
221 * writeThread() writes a series of messages to the AsyncFileWriter
224 AsyncFileWriter* writer,
228 for (size_t n = 0; n < numMessages; ++n) {
229 writer->writeMessage(
230 folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
237 : readSleepUS_{static_cast<uint64_t>(
238 std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
240 void clearSleepDuration() {
241 readSleepUS_.store(0);
243 std::chrono::microseconds getSleepUS() const {
244 return std::chrono::microseconds{readSleepUS_.load()};
247 void check(size_t numThreads, size_t messagesPerThread) {
248 EXPECT_EQ("", trailingData_);
249 EXPECT_EQ(numThreads, writers_.size());
250 size_t totalMessagesReceived = 0;
251 for (const auto& writerData : writers_) {
252 EXPECT_LE(writerData.numMessages, messagesPerThread);
253 EXPECT_LE(writerData.lastId, messagesPerThread);
254 totalMessagesReceived += writerData.numMessages;
257 EXPECT_EQ(0, numUnableToParse_);
258 EXPECT_EQ(0, numOutOfOrder_);
260 numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
264 * Check that no messages were dropped from the specified thread.
266 void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
267 EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
268 EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
271 void messageReceived(StringPiece msg) {
272 if (msg.endsWith(" log messages discarded: "
273 "logging faster than we can write")) {
274 auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
275 fprintf(stderr, "received discard notification: %zu\n", discardCount);
276 numDiscarded_ += discardCount;
281 size_t messageIndex = 0;
283 parseMessage(msg, &threadID, &messageIndex);
284 } catch (const std::exception& ex) {
288 "unable to parse log message: %s\n",
289 folly::humanify(msg.str()).c_str());
293 if (threadID >= writers_.size()) {
294 writers_.resize(threadID + 1);
296 writers_[threadID].numMessages++;
297 if (messageIndex > writers_[threadID].lastId) {
298 writers_[threadID].lastId = messageIndex;
303 "received out-of-order messages from writer %zu: "
304 "%zu received after %zu\n",
307 writers_[threadID].lastId);
311 void trailingData(StringPiece data) {
312 trailingData_ = data.str();
317 size_t numMessages{0};
321 void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
322 constexpr StringPiece prefix{"thread "};
323 constexpr StringPiece middle{" message "};
324 if (!msg.startsWith(prefix)) {
325 throw std::runtime_error("bad message prefix");
328 auto idx = prefix.size();
329 auto end = msg.find(' ', idx);
330 if (end == StringPiece::npos) {
331 throw std::runtime_error("no middle found");
334 *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
335 auto rest = msg.subpiece(end);
336 if (!rest.startsWith(middle)) {
337 throw std::runtime_error("bad message middle");
340 rest.advance(middle.size());
341 *messageIndex = folly::to<size_t>(rest);
344 std::vector<WriterStats> writers_;
345 std::string trailingData_;
346 size_t numUnableToParse_{0};
347 size_t numOutOfOrder_{0};
348 size_t numDiscarded_{0};
350 std::atomic<uint64_t> readSleepUS_;
354 * readThread() reads messages slowly from a pipe. This helps test the
355 * AsyncFileWriter behavior when I/O is slow.
357 void readThread(folly::File&& file, ReadStats* stats) {
358 std::vector<char> buffer;
361 size_t bufferIdx = 0;
364 std::this_thread::sleep_for(stats->getSleepUS());
366 auto readResult = folly::readNoInt(
367 file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
368 if (readResult < 0) {
369 fprintf(stderr, "error reading from pipe: %d\n", errno);
372 if (readResult == 0) {
373 fprintf(stderr, "read EOF\n");
377 auto logDataLen = bufferIdx + readResult;
378 StringPiece logData{buffer.data(), logDataLen};
381 auto end = logData.find('\n', idx);
382 if (end == StringPiece::npos) {
383 bufferIdx = logDataLen - idx;
384 memmove(buffer.data(), buffer.data() + idx, bufferIdx);
388 StringPiece logMsg{logData.data() + idx, end - idx};
389 stats->messageReceived(logMsg);
394 if (bufferIdx != 0) {
395 stats->trailingData(StringPiece{buffer.data(), bufferIdx});
400 * The discard test spawns a number of threads that each write a large number
401 * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
402 * thread reads from it slowly, causing a backlog to build up.
404 * The test then checks that:
405 * - The read thread always receives full messages (no partial log messages)
406 * - Messages that are received are received in order
407 * - The number of messages received plus the number reported in discard
408 * notifications matches the number of messages sent.
410 TEST(AsyncFileWriter, discard) {
411 std::array<int, 2> fds;
412 auto pipeResult = pipe(fds.data());
413 folly::checkUnixError(pipeResult, "pipe failed");
414 folly::File readPipe{fds[0], true};
415 folly::File writePipe{fds[1], true};
417 // This test should always be run with at least 2 writer threads.
418 // The last one will use the NEVER_DISCARD flag, and we want at least
419 // one that does discard messages.
420 ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
423 std::thread reader(readThread, std::move(readPipe), &readStats);
425 AsyncFileWriter writer{std::move(writePipe)};
427 std::vector<std::thread> writeThreads;
428 for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
430 // Configure the last writer thread to never drop messages
431 if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
432 flags = LogWriter::NEVER_DISCARD;
435 writeThreads.emplace_back(
439 FLAGS_async_discard_messages_per_writer,
443 for (auto& t : writeThreads) {
446 fprintf(stderr, "writers done\n");
448 // Clear the read sleep duration so the reader will finish quickly now
449 readStats.clearSleepDuration();
452 FLAGS_async_discard_num_writer_threads,
453 FLAGS_async_discard_messages_per_writer);
454 // Check that no messages were dropped from the thread using the
455 // NEVER_DISCARD flag.
456 readStats.checkNoDrops(
457 FLAGS_async_discard_num_writer_threads - 1,
458 FLAGS_async_discard_messages_per_writer);