2 * Copyright 2004-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/Conv.h>
17 #include <folly/Exception.h>
18 #include <folly/File.h>
19 #include <folly/FileUtil.h>
20 #include <folly/String.h>
21 #include <folly/experimental/TestUtil.h>
22 #include <folly/experimental/logging/AsyncFileWriter.h>
23 #include <folly/experimental/logging/LoggerDB.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Unistd.h>
30 async_discard_num_writer_threads,
32 "number of threads to use to generate log messages during "
33 "the AsyncFileWriter.discard test");
35 async_discard_messages_per_writer,
37 "number of messages each writer threads should generate in "
38 "the AsyncFileWriter.discard test");
40 async_discard_read_sleep_usec,
42 "how long the read thread should sleep between reads in "
43 "the AsyncFileWriter.discard test");
45 using namespace folly;
46 using namespace std::literals::chrono_literals;
47 using folly::test::TemporaryFile;
49 TEST(AsyncFileWriter, noMessages) {
50 TemporaryFile tmpFile{"logging_test"};
52 // Test the simple construction and destruction of an AsyncFileWriter
53 // without ever writing any messages. This still exercises the I/O
54 // thread start-up and shutdown code.
55 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
58 TEST(AsyncFileWriter, simpleMessages) {
59 TemporaryFile tmpFile{"logging_test"};
62 AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
63 for (int n = 0; n < 10; ++n) {
64 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
70 auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
73 std::string expected =
84 EXPECT_EQ(expected, data);
89 static std::vector<std::string>* internalWarnings;
91 void handleLoggingError(
92 StringPiece /* file */,
95 internalWarnings->emplace_back(std::move(msg));
99 TEST(AsyncFileWriter, ioError) {
100 // Set the LoggerDB internal warning handler so we can record the messages
101 std::vector<std::string> logErrors;
102 internalWarnings = &logErrors;
103 LoggerDB::setInternalWarningHandler(handleLoggingError);
105 // Create an AsyncFileWriter that refers to a pipe whose read end is closed
106 std::array<int, 2> fds;
107 auto rc = pipe(fds.data());
108 folly::checkUnixError(rc, "failed to create pipe");
109 signal(SIGPIPE, SIG_IGN);
112 // Log a bunch of messages to the writer
113 size_t numMessages = 100;
115 AsyncFileWriter writer{folly::File{fds[1], true}};
116 for (size_t n = 0; n < numMessages; ++n) {
117 writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
122 LoggerDB::setInternalWarningHandler(nullptr);
124 // AsyncFileWriter should have some internal warning messages about the
125 // log failures. This will generally be many fewer than the number of
126 // messages we wrote, though, since it performs write batching.
127 for (const auto& msg : logErrors) {
130 testing::ContainsRegex(
131 "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
133 EXPECT_GT(logErrors.size(), 0);
134 EXPECT_LE(logErrors.size(), numMessages);
139 * writeThread() writes a series of messages to the AsyncFileWriter
142 AsyncFileWriter* writer,
146 for (size_t n = 0; n < numMessages; ++n) {
147 writer->writeMessage(
148 folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
155 : readSleepUS_{static_cast<uint64_t>(
156 std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
158 void clearSleepDuration() {
159 readSleepUS_.store(0);
161 std::chrono::microseconds getSleepUS() const {
162 return std::chrono::microseconds{readSleepUS_.load()};
165 void check(size_t numThreads, size_t messagesPerThread) {
166 EXPECT_EQ("", trailingData_);
167 EXPECT_EQ(numThreads, writers_.size());
168 size_t totalMessagesReceived = 0;
169 for (const auto& writerData : writers_) {
170 EXPECT_LE(writerData.numMessages, messagesPerThread);
171 EXPECT_LE(writerData.lastId, messagesPerThread);
172 totalMessagesReceived += writerData.numMessages;
175 EXPECT_EQ(0, numUnableToParse_);
176 EXPECT_EQ(0, numOutOfOrder_);
178 numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
182 * Check that no messages were dropped from the specified thread.
184 void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
185 EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
186 EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
189 void messageReceived(StringPiece msg) {
190 if (msg.endsWith(" log messages discarded: "
191 "logging faster than we can write")) {
192 auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
193 fprintf(stderr, "received discard notification: %zu\n", discardCount);
194 numDiscarded_ += discardCount;
199 size_t messageIndex = 0;
201 parseMessage(msg, &threadID, &messageIndex);
202 } catch (const std::exception& ex) {
206 "unable to parse log message: %s\n",
207 folly::humanify(msg.str()).c_str());
211 if (threadID >= writers_.size()) {
212 writers_.resize(threadID + 1);
214 writers_[threadID].numMessages++;
215 if (messageIndex > writers_[threadID].lastId) {
216 writers_[threadID].lastId = messageIndex;
221 "received out-of-order messages from writer %zu: "
222 "%zu received after %zu\n",
225 writers_[threadID].lastId);
229 void trailingData(StringPiece data) {
230 trailingData_ = data.str();
235 size_t numMessages{0};
239 void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
240 constexpr StringPiece prefix{"thread "};
241 constexpr StringPiece middle{" message "};
242 if (!msg.startsWith(prefix)) {
243 throw std::runtime_error("bad message prefix");
246 auto idx = prefix.size();
247 auto end = msg.find(' ', idx);
248 if (end == StringPiece::npos) {
249 throw std::runtime_error("no middle found");
252 *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
253 auto rest = msg.subpiece(end);
254 if (!rest.startsWith(middle)) {
255 throw std::runtime_error("bad message middle");
258 rest.advance(middle.size());
259 *messageIndex = folly::to<size_t>(rest);
262 std::vector<WriterStats> writers_;
263 std::string trailingData_;
264 size_t numUnableToParse_{0};
265 size_t numOutOfOrder_{0};
266 size_t numDiscarded_{0};
268 std::atomic<uint64_t> readSleepUS_;
272 * readThread() reads messages slowly from a pipe. This helps test the
273 * AsyncFileWriter behavior when I/O is slow.
275 void readThread(folly::File&& file, ReadStats* stats) {
276 std::vector<char> buffer;
279 size_t bufferIdx = 0;
282 usleep(stats->getSleepUS().count());
284 auto readResult = folly::readNoInt(
285 file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
286 if (readResult < 0) {
287 fprintf(stderr, "error reading from pipe: %d\n", errno);
290 if (readResult == 0) {
291 fprintf(stderr, "read EOF\n");
295 auto logDataLen = bufferIdx + readResult;
296 StringPiece logData{buffer.data(), logDataLen};
299 auto end = logData.find('\n', idx);
300 if (end == StringPiece::npos) {
301 bufferIdx = logDataLen - idx;
302 memmove(buffer.data(), buffer.data() + idx, bufferIdx);
306 StringPiece logMsg{logData.data() + idx, end - idx};
307 stats->messageReceived(logMsg);
312 if (bufferIdx != 0) {
313 stats->trailingData(StringPiece{buffer.data(), bufferIdx});
318 * The discard test spawns a number of threads that each write a large number
319 * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
320 * thread reads from it slowly, causing a backlog to build up.
322 * The test then checks that:
323 * - The read thread always receives full messages (no partial log messages)
324 * - Messages that are received are received in order
325 * - The number of messages received plus the number reported in discard
326 * notifications matches the number of messages sent.
328 TEST(AsyncFileWriter, discard) {
329 std::array<int, 2> fds;
330 auto pipeResult = pipe(fds.data());
331 folly::checkUnixError(pipeResult, "pipe failed");
332 folly::File readPipe{fds[0], true};
333 folly::File writePipe{fds[1], true};
335 // This test should always be run with at least 2 writer threads.
336 // The last one will use the NEVER_DISCARD flag, and we want at least
337 // one that does discard messages.
338 ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
341 std::thread reader(readThread, std::move(readPipe), &readStats);
343 AsyncFileWriter writer{std::move(writePipe)};
345 std::vector<std::thread> writeThreads;
346 for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
348 // Configure the last writer thread to never drop messages
349 if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
350 flags = LogWriter::NEVER_DISCARD;
353 writeThreads.emplace_back(
357 FLAGS_async_discard_messages_per_writer,
361 for (auto& t : writeThreads) {
364 fprintf(stderr, "writers done\n");
366 // Clear the read sleep duration so the reader will finish quickly now
367 readStats.clearSleepDuration();
370 FLAGS_async_discard_num_writer_threads,
371 FLAGS_async_discard_messages_per_writer);
372 // Check that no messages were dropped from the thread using the
373 // NEVER_DISCARD flag.
374 readStats.checkNoDrops(
375 FLAGS_async_discard_num_writer_threads - 1,
376 FLAGS_async_discard_messages_per_writer);