Fix copyright lines
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <thread>
17
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/Synchronized.h>
24 #include <folly/experimental/TestUtil.h>
25 #include <folly/experimental/logging/AsyncFileWriter.h>
26 #include <folly/experimental/logging/Init.h>
27 #include <folly/experimental/logging/LoggerDB.h>
28 #include <folly/experimental/logging/xlog.h>
29 #include <folly/futures/Future.h>
30 #include <folly/futures/Promise.h>
31 #include <folly/init/Init.h>
32 #include <folly/lang/SafeAssert.h>
33 #include <folly/portability/GFlags.h>
34 #include <folly/portability/GMock.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Unistd.h>
37
38 DEFINE_string(logging, "", "folly::logging configuration");
39 DEFINE_int64(
40     async_discard_num_normal_writers,
41     30,
42     "number of threads to use to generate normal log messages during "
43     "the AsyncFileWriter.discard test");
44 DEFINE_int64(
45     async_discard_num_nodiscard_writers,
46     2,
47     "number of threads to use to generate non-discardable log messages during "
48     "the AsyncFileWriter.discard test");
49 DEFINE_int64(
50     async_discard_read_sleep_usec,
51     500,
52     "how long the read thread should sleep between reads in "
53     "the AsyncFileWriter.discard test");
54 DEFINE_int64(
55     async_discard_timeout_msec,
56     10000,
57     "A timeout for the AsyncFileWriter.discard test if it cannot generate "
58     "enough discards");
59 DEFINE_int64(
60     async_discard_num_events,
61     10,
62     "The number of discard events to wait for in the AsyncFileWriter.discard "
63     "test");
64
65 using namespace folly;
66 using namespace std::literals::chrono_literals;
67 using folly::test::TemporaryFile;
68 using std::chrono::steady_clock;
69 using std::chrono::milliseconds;
70
71 TEST(AsyncFileWriter, noMessages) {
72   TemporaryFile tmpFile{"logging_test"};
73
74   // Test the simple construction and destruction of an AsyncFileWriter
75   // without ever writing any messages.  This still exercises the I/O
76   // thread start-up and shutdown code.
77   AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
78 }
79
80 TEST(AsyncFileWriter, simpleMessages) {
81   TemporaryFile tmpFile{"logging_test"};
82
83   {
84     AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
85     for (int n = 0; n < 10; ++n) {
86       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
87       sched_yield();
88     }
89   }
90   tmpFile.close();
91
92   std::string data;
93   auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
94   ASSERT_TRUE(ret);
95
96   std::string expected =
97       "message 0\n"
98       "message 1\n"
99       "message 2\n"
100       "message 3\n"
101       "message 4\n"
102       "message 5\n"
103       "message 6\n"
104       "message 7\n"
105       "message 8\n"
106       "message 9\n";
107   EXPECT_EQ(expected, data);
108 }
109
110 namespace {
111 static std::vector<std::string>* internalWarnings;
112
113 void handleLoggingError(
114     StringPiece /* file */,
115     int /* lineNumber */,
116     std::string&& msg) {
117   internalWarnings->emplace_back(std::move(msg));
118 }
119 } // namespace
120
121 TEST(AsyncFileWriter, ioError) {
122   // Set the LoggerDB internal warning handler so we can record the messages
123   std::vector<std::string> logErrors;
124   internalWarnings = &logErrors;
125   LoggerDB::setInternalWarningHandler(handleLoggingError);
126
127   // Create an AsyncFileWriter that refers to a pipe whose read end is closed
128   std::array<int, 2> fds;
129   auto rc = pipe(fds.data());
130   folly::checkUnixError(rc, "failed to create pipe");
131 #ifndef _WIN32
132   signal(SIGPIPE, SIG_IGN);
133 #endif
134   ::close(fds[0]);
135
136   // Log a bunch of messages to the writer
137   size_t numMessages = 100;
138   {
139     AsyncFileWriter writer{folly::File{fds[1], true}};
140     for (size_t n = 0; n < numMessages; ++n) {
141       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
142       sched_yield();
143     }
144   }
145
146   LoggerDB::setInternalWarningHandler(nullptr);
147
148   // AsyncFileWriter should have some internal warning messages about the
149   // log failures.  This will generally be many fewer than the number of
150   // messages we wrote, though, since it performs write batching.
151   //
152   // GTest on Windows doesn't support alternation in the regex syntax -_-....
153   const std::string kExpectedErrorMessage =
154 #if _WIN32
155       // The `pipe` call above is actually implemented via sockets, so we get
156       // a different error message.
157       "An established connection was aborted by the software in your host machine\\.";
158 #else
159       "Broken pipe";
160 #endif
161
162   for (const auto& msg : logErrors) {
163     EXPECT_THAT(
164         msg,
165         testing::ContainsRegex(
166             "error writing to log file .* in AsyncFileWriter.*: " +
167             kExpectedErrorMessage));
168   }
169   EXPECT_GT(logErrors.size(), 0);
170   EXPECT_LE(logErrors.size(), numMessages);
171 }
172
173 namespace {
174 size_t fillUpPipe(int fd) {
175   int flags = fcntl(fd, F_GETFL);
176   folly::checkUnixError(flags, "failed get file descriptor flags");
177   auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
178   folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
179   std::vector<char> data;
180   data.resize(4000);
181   size_t totalBytes = 0;
182   size_t bytesToWrite = data.size();
183   while (true) {
184     auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
185     if (bytesWritten < 0) {
186       if (errno == EAGAIN || errno == EWOULDBLOCK) {
187         // We blocked.  Keep trying smaller writes, until we get down to a
188         // single byte, just to make sure the logging code really won't be able
189         // to write anything to the pipe.
190         if (bytesToWrite <= 1) {
191           break;
192         } else {
193           bytesToWrite /= 2;
194         }
195       } else {
196         throwSystemError("error writing to pipe");
197       }
198     } else {
199       totalBytes += bytesWritten;
200     }
201   }
202   XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
203
204   rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
205   folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
206
207   return totalBytes;
208 }
209 } // namespace
210
211 TEST(AsyncFileWriter, flush) {
212   // Set up a pipe(), then write data to the write endpoint until it fills up
213   // and starts blocking.
214   std::array<int, 2> fds;
215   auto rc = pipe(fds.data());
216   folly::checkUnixError(rc, "failed to create pipe");
217   File readPipe{fds[0], true};
218   File writePipe{fds[1], true};
219
220   auto paddingSize = fillUpPipe(writePipe.fd());
221
222   // Now set up an AsyncFileWriter pointing at the write end of the pipe
223   AsyncFileWriter writer{std::move(writePipe)};
224
225   // Write a message
226   writer.writeMessage("test message: " + std::string(200, 'x'));
227
228   // Call flush().  Use a separate thread, since this should block until we
229   // consume data from the pipe.
230   Promise<Unit> promise;
231   auto future = promise.getFuture();
232   auto flushFunction = [&] { writer.flush(); };
233   std::thread flushThread{
234       [&]() { promise.setTry(makeTryWith(flushFunction)); }};
235   // Detach the flush thread now rather than joining it at the end of the
236   // function.  This way if something goes wrong during the test we will fail
237   // with the real error, rather than crashing due to the std::thread
238   // destructor running on a still-joinable thread.
239   flushThread.detach();
240
241   // Sleep briefly, and make sure flush() still hasn't completed.
242   // If it has completed this doesn't necessarily indicate a bug in
243   // AsyncFileWriter, but instead indicates that our test code failed to
244   // successfully cause a blocking write.
245   /* sleep override */
246   std::this_thread::sleep_for(10ms);
247   EXPECT_FALSE(future.isReady());
248
249   // Now read from the pipe
250   std::vector<char> buf;
251   buf.resize(paddingSize);
252   readFull(readPipe.fd(), buf.data(), buf.size());
253
254   // Make sure flush completes successfully now
255   future.get(10ms);
256 }
257
258 // A large-ish message suffix, just to consume space and help fill up
259 // log buffers faster.
260 static constexpr StringPiece kMsgSuffix{
261     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
262     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
263     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
264     "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
265
266 class ReadStats {
267  public:
268   ReadStats()
269       : deadline_{steady_clock::now() +
270                   milliseconds{FLAGS_async_discard_timeout_msec}},
271         readSleepUS_{static_cast<uint64_t>(
272             std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
273
274   void clearSleepDuration() {
275     readSleepUS_.store(0);
276   }
277   std::chrono::microseconds getSleepUS() const {
278     return std::chrono::microseconds{readSleepUS_.load()};
279   }
280
281   bool shouldWriterStop() const {
282     // Stop after we have seen the required number of separate discard events.
283     // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
284     // ensures the async writer blocks and then makes progress again multiple
285     // times.
286     if (FLAGS_async_discard_num_events > 0 &&
287         discardEventsSeen_.load() >
288             static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
289       return true;
290     }
291
292     // Stop after a timeout, even if we don't hit the number of requested
293     // discards.
294     return steady_clock::now() > deadline_;
295   }
296   void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
297     auto map = perThreadWriteData_.wlock();
298     FOLLY_SAFE_CHECK(
299         map->find(threadID) == map->end(),
300         "multiple writer threads with same ID");
301     auto& data = (*map)[threadID];
302     data.numMessagesWritten = messagesWritten;
303     data.flags = flags;
304   }
305
306   void check() {
307     auto writeDataMap = perThreadWriteData_.wlock();
308
309     EXPECT_EQ("", trailingData_);
310     EXPECT_EQ(0, numUnableToParse_);
311     EXPECT_EQ(0, numOutOfOrder_);
312
313     // Check messages received from each writer thread
314     size_t readerStatsChecked = 0;
315     size_t totalMessagesWritten = 0;
316     size_t totalMessagesRead = 0;
317     for (const auto& writeEntry : *writeDataMap) {
318       const auto& writeInfo = writeEntry.second;
319       totalMessagesWritten += writeInfo.numMessagesWritten;
320
321       auto iter = perThreadReadData_.find(writeEntry.first);
322       if (iter == perThreadReadData_.end()) {
323         // We never received any messages from this writer thread.
324         // This is okay as long as this is not a NEVER_DISCARD writer.
325         EXPECT_EQ(0, writeInfo.flags);
326         continue;
327       }
328       const auto& readInfo = iter->second;
329       ++readerStatsChecked;
330       totalMessagesRead += readInfo.numMessagesRead;
331       if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
332         // Non-discarding threads should never discard anything
333         EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
334         EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
335       } else {
336         // Other threads may have discarded some messages
337         EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
338         EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
339       }
340     }
341
342     EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
343     EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
344
345     // This test is intended to check the discard behavior.
346     // Fail the test if we didn't actually trigger any discards before we timed
347     // out.
348     EXPECT_GT(numDiscarded_, 0);
349
350     XLOG(DBG1) << totalMessagesWritten << " messages written, "
351                << totalMessagesRead << " messages read, " << numDiscarded_
352                << " messages discarded";
353   }
354
355   void messageReceived(StringPiece msg) {
356     if (msg.endsWith(" log messages discarded: "
357                      "logging faster than we can write")) {
358       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
359       XLOG(DBG3, "received discard notification: ", discardCount);
360       numDiscarded_ += discardCount;
361       ++discardEventsSeen_;
362       return;
363     }
364
365     size_t threadID = 0;
366     size_t messageIndex = 0;
367     try {
368       parseMessage(msg, &threadID, &messageIndex);
369     } catch (const std::exception& ex) {
370       ++numUnableToParse_;
371       XLOG(ERR, "unable to parse log message: ", msg);
372       return;
373     }
374
375     auto& data = perThreadReadData_[threadID];
376     data.numMessagesRead++;
377     if (messageIndex > data.lastId) {
378       data.lastId = messageIndex;
379     } else {
380       ++numOutOfOrder_;
381       XLOG(ERR) << "received out-of-order messages from writer " << threadID
382                 << ": " << messageIndex << " received after " << data.lastId;
383     }
384   }
385
386   void trailingData(StringPiece data) {
387     trailingData_ = data.str();
388   }
389
390  private:
391   struct ReaderData {
392     size_t numMessagesRead{0};
393     size_t lastId{0};
394   };
395   struct WriterData {
396     size_t numMessagesWritten{0};
397     int flags{0};
398   };
399
400   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
401     // Validate and strip off the message prefix and suffix
402     constexpr StringPiece prefix{"thread "};
403     if (!msg.startsWith(prefix)) {
404       throw std::runtime_error("bad message prefix");
405     }
406     msg.advance(prefix.size());
407     if (!msg.endsWith(kMsgSuffix)) {
408       throw std::runtime_error("bad message suffix");
409     }
410     msg.subtract(kMsgSuffix.size());
411
412     // Parse then strip off the thread index
413     auto threadIDEnd = msg.find(' ');
414     if (threadIDEnd == StringPiece::npos) {
415       throw std::runtime_error("no middle found");
416     }
417     *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
418     msg.advance(threadIDEnd);
419
420     // Validate that the middle of the message is what we expect,
421     // then strip it off
422     constexpr StringPiece middle{" message "};
423     if (!msg.startsWith(middle)) {
424       throw std::runtime_error("bad message middle");
425     }
426     msg.advance(middle.size());
427
428     // Parse the message index
429     *messageIndex = folly::to<size_t>(msg);
430   }
431
432   /**
433    * Data about each writer thread, as recorded by the reader thread.
434    *
435    * At the end of the test we will compare perThreadReadData_ (recorded by the
436    * reader) with perThreadWriteData_ (recorded by the writers) to make sure
437    * the data matches up.
438    *
439    * This is a map from writer_thread_id to ReaderData.
440    * The writer_thread_id is extracted from the received messages.
441    *
442    * This field does not need locking as it is only updated by the single
443    * reader thread.
444    */
445   std::unordered_map<size_t, ReaderData> perThreadReadData_;
446
447   /*
448    * Additional information recorded by the reader thread.
449    */
450   std::string trailingData_;
451   size_t numUnableToParse_{0};
452   size_t numOutOfOrder_{0};
453   size_t numDiscarded_{0};
454
455   /**
456    * deadline_ is a maximum end time for the test.
457    *
458    * The writer threads quit if the deadline is reached even if they have not
459    * produced the desired number of discard events yet.
460    */
461   const std::chrono::steady_clock::time_point deadline_;
462
463   /**
464    * How long the reader thread should sleep between each read event.
465    *
466    * This is initially set to a non-zero value (read from the
467    * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
468    * slowly, which will fill up the pipe buffer and cause discard events.
469    *
470    * Once we have produce enough discards and are ready to finish the test the
471    * main thread reduces readSleepUS_ to 0, so the reader will finish the
472    * remaining message backlog quickly.
473    */
474   std::atomic<uint64_t> readSleepUS_{0};
475
476   /**
477    * A count of how many discard events have been seen so far.
478    *
479    * The reader increments discardEventsSeen_ each time it sees a discard
480    * notification message.  A "discard event" basically corresponds to a single
481    * group of dropped messages.  Once the reader pulls some messages off out of
482    * the pipe the writers should be able to send more data, but the buffer will
483    * eventually fill up again, producing another discard event.
484    */
485   std::atomic<uint64_t> discardEventsSeen_{0};
486
487   /**
488    * Data about each writer thread, as recorded by the writers.
489    *
490    * When each writer thread finishes it records how many messages it wrote,
491    * plus the flags it used to write the messages.
492    */
493   folly::Synchronized<std::unordered_map<size_t, WriterData>>
494       perThreadWriteData_;
495 };
496
497 /**
498  * readThread() reads messages slowly from a pipe.  This helps test the
499  * AsyncFileWriter behavior when I/O is slow.
500  */
501 void readThread(folly::File&& file, ReadStats* stats) {
502   std::vector<char> buffer;
503   buffer.resize(1024);
504
505   size_t bufferIdx = 0;
506   while (true) {
507     /* sleep override */
508     std::this_thread::sleep_for(stats->getSleepUS());
509
510     auto readResult = folly::readNoInt(
511         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
512     if (readResult < 0) {
513       XLOG(ERR, "error reading from pipe: ", errno);
514       return;
515     }
516     if (readResult == 0) {
517       XLOG(DBG2, "read EOF");
518       break;
519     }
520
521     auto logDataLen = bufferIdx + readResult;
522     StringPiece logData{buffer.data(), logDataLen};
523     auto idx = 0;
524     while (true) {
525       auto end = logData.find('\n', idx);
526       if (end == StringPiece::npos) {
527         bufferIdx = logDataLen - idx;
528         memmove(buffer.data(), buffer.data() + idx, bufferIdx);
529         break;
530       }
531
532       StringPiece logMsg{logData.data() + idx, end - idx};
533       stats->messageReceived(logMsg);
534       idx = end + 1;
535     }
536   }
537
538   if (bufferIdx != 0) {
539     stats->trailingData(StringPiece{buffer.data(), bufferIdx});
540   }
541 }
542
543 /**
544  * writeThread() writes a series of messages to the AsyncFileWriter
545  */
546 void writeThread(
547     AsyncFileWriter* writer,
548     size_t id,
549     uint32_t flags,
550     ReadStats* readStats) {
551   size_t msgID = 0;
552   while (true) {
553     ++msgID;
554     writer->writeMessage(
555         folly::to<std::string>(
556             "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
557         flags);
558
559     // Break out once the reader has seen enough discards
560     if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
561       readStats->writerFinished(id, msgID, flags);
562       break;
563     }
564   }
565 }
566
567 /*
568  * The discard test spawns a number of threads that each write a large number
569  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
570  * thread reads from it slowly, causing a backlog to build up.
571  *
572  * The test then checks that:
573  * - The read thread always receives full messages (no partial log messages)
574  * - Messages that are received are received in order
575  * - The number of messages received plus the number reported in discard
576  *   notifications matches the number of messages sent.
577  */
578 TEST(AsyncFileWriter, discard) {
579   std::array<int, 2> fds;
580   auto pipeResult = pipe(fds.data());
581   folly::checkUnixError(pipeResult, "pipe failed");
582   folly::File readPipe{fds[0], true};
583   folly::File writePipe{fds[1], true};
584
585   ReadStats readStats;
586   std::thread reader(readThread, std::move(readPipe), &readStats);
587   {
588     AsyncFileWriter writer{std::move(writePipe)};
589
590     std::vector<std::thread> writeThreads;
591     size_t numThreads = FLAGS_async_discard_num_normal_writers +
592         FLAGS_async_discard_num_nodiscard_writers;
593
594     for (size_t n = 0; n < numThreads; ++n) {
595       uint32_t flags = 0;
596       if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
597         flags = LogWriter::NEVER_DISCARD;
598       }
599       XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
600
601       writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
602     }
603
604     for (auto& t : writeThreads) {
605       t.join();
606     }
607     XLOG(DBG2, "writers done");
608   }
609   // Clear the read sleep duration so the reader will finish quickly now
610   readStats.clearSleepDuration();
611   reader.join();
612   readStats.check();
613 }
614
615 int main(int argc, char* argv[]) {
616   testing::InitGoogleTest(&argc, argv);
617   folly::init(&argc, &argv);
618   folly::initLogging(FLAGS_logging);
619
620   return RUN_ALL_TESTS();
621 }