32dd790d1c7d2e37c61b02b5f530b2e6d01ae3e2
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <thread>
17
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/Synchronized.h>
24 #include <folly/experimental/TestUtil.h>
25 #include <folly/experimental/logging/AsyncFileWriter.h>
26 #include <folly/experimental/logging/Init.h>
27 #include <folly/experimental/logging/LoggerDB.h>
28 #include <folly/experimental/logging/xlog.h>
29 #include <folly/futures/Future.h>
30 #include <folly/futures/Promise.h>
31 #include <folly/init/Init.h>
32 #include <folly/portability/GFlags.h>
33 #include <folly/portability/GMock.h>
34 #include <folly/portability/GTest.h>
35 #include <folly/portability/Unistd.h>
36
37 DEFINE_string(logging, "", "folly::logging configuration");
38 DEFINE_int64(
39     async_discard_num_normal_writers,
40     30,
41     "number of threads to use to generate normal log messages during "
42     "the AsyncFileWriter.discard test");
43 DEFINE_int64(
44     async_discard_num_nodiscard_writers,
45     2,
46     "number of threads to use to generate non-discardable log messages during "
47     "the AsyncFileWriter.discard test");
48 DEFINE_int64(
49     async_discard_read_sleep_usec,
50     500,
51     "how long the read thread should sleep between reads in "
52     "the AsyncFileWriter.discard test");
53 DEFINE_int64(
54     async_discard_timeout_msec,
55     10000,
56     "A timeout for the AsyncFileWriter.discard test if it cannot generate "
57     "enough discards");
58 DEFINE_int64(
59     async_discard_num_events,
60     10,
61     "The number of discard events to wait for in the AsyncFileWriter.discard "
62     "test");
63
64 using namespace folly;
65 using namespace std::literals::chrono_literals;
66 using folly::test::TemporaryFile;
67 using std::chrono::steady_clock;
68 using std::chrono::milliseconds;
69
70 TEST(AsyncFileWriter, noMessages) {
71   TemporaryFile tmpFile{"logging_test"};
72
73   // Test the simple construction and destruction of an AsyncFileWriter
74   // without ever writing any messages.  This still exercises the I/O
75   // thread start-up and shutdown code.
76   AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
77 }
78
79 TEST(AsyncFileWriter, simpleMessages) {
80   TemporaryFile tmpFile{"logging_test"};
81
82   {
83     AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
84     for (int n = 0; n < 10; ++n) {
85       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
86       sched_yield();
87     }
88   }
89   tmpFile.close();
90
91   std::string data;
92   auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
93   ASSERT_TRUE(ret);
94
95   std::string expected =
96       "message 0\n"
97       "message 1\n"
98       "message 2\n"
99       "message 3\n"
100       "message 4\n"
101       "message 5\n"
102       "message 6\n"
103       "message 7\n"
104       "message 8\n"
105       "message 9\n";
106   EXPECT_EQ(expected, data);
107 }
108
109 namespace {
110 static std::vector<std::string>* internalWarnings;
111
112 void handleLoggingError(
113     StringPiece /* file */,
114     int /* lineNumber */,
115     std::string&& msg) {
116   internalWarnings->emplace_back(std::move(msg));
117 }
118 }
119
120 TEST(AsyncFileWriter, ioError) {
121   // Set the LoggerDB internal warning handler so we can record the messages
122   std::vector<std::string> logErrors;
123   internalWarnings = &logErrors;
124   LoggerDB::setInternalWarningHandler(handleLoggingError);
125
126   // Create an AsyncFileWriter that refers to a pipe whose read end is closed
127   std::array<int, 2> fds;
128   auto rc = pipe(fds.data());
129   folly::checkUnixError(rc, "failed to create pipe");
130 #ifndef _WIN32
131   signal(SIGPIPE, SIG_IGN);
132 #endif
133   ::close(fds[0]);
134
135   // Log a bunch of messages to the writer
136   size_t numMessages = 100;
137   {
138     AsyncFileWriter writer{folly::File{fds[1], true}};
139     for (size_t n = 0; n < numMessages; ++n) {
140       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
141       sched_yield();
142     }
143   }
144
145   LoggerDB::setInternalWarningHandler(nullptr);
146
147   // AsyncFileWriter should have some internal warning messages about the
148   // log failures.  This will generally be many fewer than the number of
149   // messages we wrote, though, since it performs write batching.
150   //
151   // GTest on Windows doesn't support alternation in the regex syntax -_-....
152   const std::string kExpectedErrorMessage =
153 #if _WIN32
154       // The `pipe` call above is actually implemented via sockets, so we get
155       // a different error message.
156       "An established connection was aborted by the software in your host machine\\.";
157 #else
158       "Broken pipe";
159 #endif
160
161   for (const auto& msg : logErrors) {
162     EXPECT_THAT(
163         msg,
164         testing::ContainsRegex(
165             "error writing to log file .* in AsyncFileWriter.*: " +
166             kExpectedErrorMessage));
167   }
168   EXPECT_GT(logErrors.size(), 0);
169   EXPECT_LE(logErrors.size(), numMessages);
170 }
171
172 namespace {
173 size_t fillUpPipe(int fd) {
174   int flags = fcntl(fd, F_GETFL);
175   folly::checkUnixError(flags, "failed get file descriptor flags");
176   auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
177   folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
178   std::vector<char> data;
179   data.resize(4000);
180   size_t totalBytes = 0;
181   size_t bytesToWrite = data.size();
182   while (true) {
183     auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
184     if (bytesWritten < 0) {
185       if (errno == EAGAIN || errno == EWOULDBLOCK) {
186         // We blocked.  Keep trying smaller writes, until we get down to a
187         // single byte, just to make sure the logging code really won't be able
188         // to write anything to the pipe.
189         if (bytesToWrite <= 1) {
190           break;
191         } else {
192           bytesToWrite /= 2;
193         }
194       } else {
195         throwSystemError("error writing to pipe");
196       }
197     } else {
198       totalBytes += bytesWritten;
199     }
200   }
201   XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
202
203   rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
204   folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
205
206   return totalBytes;
207 }
208 }
209
210 TEST(AsyncFileWriter, flush) {
211   // Set up a pipe(), then write data to the write endpoint until it fills up
212   // and starts blocking.
213   std::array<int, 2> fds;
214   auto rc = pipe(fds.data());
215   folly::checkUnixError(rc, "failed to create pipe");
216   File readPipe{fds[0], true};
217   File writePipe{fds[1], true};
218
219   auto paddingSize = fillUpPipe(writePipe.fd());
220
221   // Now set up an AsyncFileWriter pointing at the write end of the pipe
222   AsyncFileWriter writer{std::move(writePipe)};
223
224   // Write a message
225   writer.writeMessage("test message: " + std::string(200, 'x'));
226
227   // Call flush().  Use a separate thread, since this should block until we
228   // consume data from the pipe.
229   Promise<Unit> promise;
230   auto future = promise.getFuture();
231   auto flushFunction = [&] { writer.flush(); };
232   std::thread flushThread{
233       [&]() { promise.setTry(makeTryWith(flushFunction)); }};
234   // Detach the flush thread now rather than joining it at the end of the
235   // function.  This way if something goes wrong during the test we will fail
236   // with the real error, rather than crashing due to the std::thread
237   // destructor running on a still-joinable thread.
238   flushThread.detach();
239
240   // Sleep briefly, and make sure flush() still hasn't completed.
241   // If it has completed this doesn't necessarily indicate a bug in
242   // AsyncFileWriter, but instead indicates that our test code failed to
243   // successfully cause a blocking write.
244   /* sleep override */
245   std::this_thread::sleep_for(10ms);
246   EXPECT_FALSE(future.isReady());
247
248   // Now read from the pipe
249   std::vector<char> buf;
250   buf.resize(paddingSize);
251   readFull(readPipe.fd(), buf.data(), buf.size());
252
253   // Make sure flush completes successfully now
254   future.get(10ms);
255 }
256
257 // A large-ish message suffix, just to consume space and help fill up
258 // log buffers faster.
259 static constexpr StringPiece kMsgSuffix{
260     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
261     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
262     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
263     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
264
265 class ReadStats {
266  public:
267   ReadStats()
268       : deadline_{steady_clock::now() +
269                   milliseconds{FLAGS_async_discard_timeout_msec}},
270         readSleepUS_{static_cast<uint64_t>(
271             std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
272
273   void clearSleepDuration() {
274     readSleepUS_.store(0);
275   }
276   std::chrono::microseconds getSleepUS() const {
277     return std::chrono::microseconds{readSleepUS_.load()};
278   }
279
280   bool shouldWriterStop() const {
281     // Stop after we have seen the required number of separate discard events.
282     // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
283     // ensures the async writer blocks and then makes progress again multiple
284     // times.
285     if (FLAGS_async_discard_num_events > 0 &&
286         discardEventsSeen_.load() >
287             static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
288       return true;
289     }
290
291     // Stop after a timeout, even if we don't hit the number of requested
292     // discards.
293     return steady_clock::now() > deadline_;
294   }
295   void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
296     auto map = perThreadWriteData_.wlock();
297     assert(map->find(threadID) == map->end());
298     auto& data = (*map)[threadID];
299     data.numMessagesWritten = messagesWritten;
300     data.flags = flags;
301   }
302
303   void check() {
304     auto writeDataMap = perThreadWriteData_.wlock();
305
306     EXPECT_EQ("", trailingData_);
307     EXPECT_EQ(0, numUnableToParse_);
308     EXPECT_EQ(0, numOutOfOrder_);
309
310     // Check messages received from each writer thread
311     size_t readerStatsChecked = 0;
312     size_t totalMessagesWritten = 0;
313     size_t totalMessagesRead = 0;
314     for (const auto& writeEntry : *writeDataMap) {
315       const auto& writeInfo = writeEntry.second;
316       totalMessagesWritten += writeInfo.numMessagesWritten;
317
318       auto iter = perThreadReadData_.find(writeEntry.first);
319       if (iter == perThreadReadData_.end()) {
320         // We never received any messages from this writer thread.
321         // This is okay as long as this is not a NEVER_DISCARD writer.
322         EXPECT_EQ(0, writeInfo.flags);
323         continue;
324       }
325       const auto& readInfo = iter->second;
326       ++readerStatsChecked;
327       totalMessagesRead += readInfo.numMessagesRead;
328       if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
329         // Non-discarding threads should never discard anything
330         EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
331         EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
332       } else {
333         // Other threads may have discarded some messages
334         EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
335         EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
336       }
337     }
338
339     EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
340     EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
341
342     // This test is intended to check the discard behavior.
343     // Fail the test if we didn't actually trigger any discards before we timed
344     // out.
345     EXPECT_GT(numDiscarded_, 0);
346
347     XLOG(DBG1) << totalMessagesWritten << " messages written, "
348                << totalMessagesRead << " messages read, " << numDiscarded_
349                << " messages discarded";
350   }
351
352   void messageReceived(StringPiece msg) {
353     if (msg.endsWith(" log messages discarded: "
354                      "logging faster than we can write")) {
355       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
356       XLOG(DBG3, "received discard notification: ", discardCount);
357       numDiscarded_ += discardCount;
358       ++discardEventsSeen_;
359       return;
360     }
361
362     size_t threadID = 0;
363     size_t messageIndex = 0;
364     try {
365       parseMessage(msg, &threadID, &messageIndex);
366     } catch (const std::exception& ex) {
367       ++numUnableToParse_;
368       XLOG(ERR, "unable to parse log message: ", msg);
369       return;
370     }
371
372     auto& data = perThreadReadData_[threadID];
373     data.numMessagesRead++;
374     if (messageIndex > data.lastId) {
375       data.lastId = messageIndex;
376     } else {
377       ++numOutOfOrder_;
378       XLOG(ERR) << "received out-of-order messages from writer " << threadID
379                 << ": " << messageIndex << " received after " << data.lastId;
380     }
381   }
382
383   void trailingData(StringPiece data) {
384     trailingData_ = data.str();
385   }
386
387  private:
388   struct ReaderData {
389     size_t numMessagesRead{0};
390     size_t lastId{0};
391   };
392   struct WriterData {
393     size_t numMessagesWritten{0};
394     int flags{0};
395   };
396
397   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
398     // Validate and strip off the message prefix and suffix
399     constexpr StringPiece prefix{"thread "};
400     if (!msg.startsWith(prefix)) {
401       throw std::runtime_error("bad message prefix");
402     }
403     msg.advance(prefix.size());
404     if (!msg.endsWith(kMsgSuffix)) {
405       throw std::runtime_error("bad message suffix");
406     }
407     msg.subtract(kMsgSuffix.size());
408
409     // Parse then strip off the thread index
410     auto threadIDEnd = msg.find(' ');
411     if (threadIDEnd == StringPiece::npos) {
412       throw std::runtime_error("no middle found");
413     }
414     *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
415     msg.advance(threadIDEnd);
416
417     // Validate that the middle of the message is what we expect,
418     // then strip it off
419     constexpr StringPiece middle{" message "};
420     if (!msg.startsWith(middle)) {
421       throw std::runtime_error("bad message middle");
422     }
423     msg.advance(middle.size());
424
425     // Parse the message index
426     *messageIndex = folly::to<size_t>(msg);
427   }
428
429   /**
430    * Data about each writer thread, as recorded by the reader thread.
431    *
432    * At the end of the test we will compare perThreadReadData_ (recorded by the
433    * reader) with perThreadWriteData_ (recorded by the writers) to make sure
434    * the data matches up.
435    *
436    * This is a map from writer_thread_id to ReaderData.
437    * The writer_thread_id is extracted from the received messages.
438    *
439    * This field does not need locking as it is only updated by the single
440    * reader thread.
441    */
442   std::unordered_map<size_t, ReaderData> perThreadReadData_;
443
444   /*
445    * Additional information recorded by the reader thread.
446    */
447   std::string trailingData_;
448   size_t numUnableToParse_{0};
449   size_t numOutOfOrder_{0};
450   size_t numDiscarded_{0};
451
452   /**
453    * deadline_ is a maximum end time for the test.
454    *
455    * The writer threads quit if the deadline is reached even if they have not
456    * produced the desired number of discard events yet.
457    */
458   const std::chrono::steady_clock::time_point deadline_;
459
460   /**
461    * How long the reader thread should sleep between each read event.
462    *
463    * This is initially set to a non-zero value (read from the
464    * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
465    * slowly, which will fill up the pipe buffer and cause discard events.
466    *
467    * Once we have produce enough discards and are ready to finish the test the
468    * main thread reduces readSleepUS_ to 0, so the reader will finish the
469    * remaining message backlog quickly.
470    */
471   std::atomic<uint64_t> readSleepUS_{0};
472
473   /**
474    * A count of how many discard events have been seen so far.
475    *
476    * The reader increments discardEventsSeen_ each time it sees a discard
477    * notification message.  A "discard event" basically corresponds to a single
478    * group of dropped messages.  Once the reader pulls some messages off out of
479    * the pipe the writers should be able to send more data, but the buffer will
480    * eventually fill up again, producing another discard event.
481    */
482   std::atomic<uint64_t> discardEventsSeen_{0};
483
484   /**
485    * Data about each writer thread, as recorded by the writers.
486    *
487    * When each writer thread finishes it records how many messages it wrote,
488    * plus the flags it used to write the messages.
489    */
490   folly::Synchronized<std::unordered_map<size_t, WriterData>>
491       perThreadWriteData_;
492 };
493
494 /**
495  * readThread() reads messages slowly from a pipe.  This helps test the
496  * AsyncFileWriter behavior when I/O is slow.
497  */
498 void readThread(folly::File&& file, ReadStats* stats) {
499   std::vector<char> buffer;
500   buffer.resize(1024);
501
502   size_t bufferIdx = 0;
503   while (true) {
504     /* sleep override */
505     std::this_thread::sleep_for(stats->getSleepUS());
506
507     auto readResult = folly::readNoInt(
508         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
509     if (readResult < 0) {
510       XLOG(ERR, "error reading from pipe: ", errno);
511       return;
512     }
513     if (readResult == 0) {
514       XLOG(DBG2, "read EOF");
515       break;
516     }
517
518     auto logDataLen = bufferIdx + readResult;
519     StringPiece logData{buffer.data(), logDataLen};
520     auto idx = 0;
521     while (true) {
522       auto end = logData.find('\n', idx);
523       if (end == StringPiece::npos) {
524         bufferIdx = logDataLen - idx;
525         memmove(buffer.data(), buffer.data() + idx, bufferIdx);
526         break;
527       }
528
529       StringPiece logMsg{logData.data() + idx, end - idx};
530       stats->messageReceived(logMsg);
531       idx = end + 1;
532     }
533   }
534
535   if (bufferIdx != 0) {
536     stats->trailingData(StringPiece{buffer.data(), bufferIdx});
537   }
538 }
539
540 /**
541  * writeThread() writes a series of messages to the AsyncFileWriter
542  */
543 void writeThread(
544     AsyncFileWriter* writer,
545     size_t id,
546     uint32_t flags,
547     ReadStats* readStats) {
548   size_t msgID = 0;
549   while (true) {
550     ++msgID;
551     writer->writeMessage(
552         folly::to<std::string>(
553             "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
554         flags);
555
556     // Break out once the reader has seen enough discards
557     if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
558       readStats->writerFinished(id, msgID, flags);
559       break;
560     }
561   }
562 }
563
564 /*
565  * The discard test spawns a number of threads that each write a large number
566  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
567  * thread reads from it slowly, causing a backlog to build up.
568  *
569  * The test then checks that:
570  * - The read thread always receives full messages (no partial log messages)
571  * - Messages that are received are received in order
572  * - The number of messages received plus the number reported in discard
573  *   notifications matches the number of messages sent.
574  */
575 TEST(AsyncFileWriter, discard) {
576   std::array<int, 2> fds;
577   auto pipeResult = pipe(fds.data());
578   folly::checkUnixError(pipeResult, "pipe failed");
579   folly::File readPipe{fds[0], true};
580   folly::File writePipe{fds[1], true};
581
582   ReadStats readStats;
583   std::thread reader(readThread, std::move(readPipe), &readStats);
584   {
585     AsyncFileWriter writer{std::move(writePipe)};
586
587     std::vector<std::thread> writeThreads;
588     size_t numThreads = FLAGS_async_discard_num_normal_writers +
589         FLAGS_async_discard_num_nodiscard_writers;
590
591     for (size_t n = 0; n < numThreads; ++n) {
592       uint32_t flags = 0;
593       if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
594         flags = LogWriter::NEVER_DISCARD;
595       }
596       XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
597
598       writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
599     }
600
601     for (auto& t : writeThreads) {
602       t.join();
603     }
604     XLOG(DBG2, "writers done");
605   }
606   // Clear the read sleep duration so the reader will finish quickly now
607   readStats.clearSleepDuration();
608   reader.join();
609   readStats.check();
610 }
611
612 int main(int argc, char* argv[]) {
613   testing::InitGoogleTest(&argc, argv);
614   folly::init(&argc, &argv);
615   // Don't use async logging in the async logging tests :-)
616   folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false);
617
618   return RUN_ALL_TESTS();
619 }