logging: add a LogHandler::flush() call
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <thread>
17
18 #include <folly/Conv.h>
19 #include <folly/Exception.h>
20 #include <folly/File.h>
21 #include <folly/FileUtil.h>
22 #include <folly/String.h>
23 #include <folly/experimental/TestUtil.h>
24 #include <folly/experimental/logging/AsyncFileWriter.h>
25 #include <folly/experimental/logging/LoggerDB.h>
26 #include <folly/futures/Future.h>
27 #include <folly/futures/Promise.h>
28 #include <folly/portability/GFlags.h>
29 #include <folly/portability/GMock.h>
30 #include <folly/portability/GTest.h>
31 #include <folly/portability/Unistd.h>
32
33 DEFINE_int64(
34     async_discard_num_writer_threads,
35     32,
36     "number of threads to use to generate log messages during "
37     "the AsyncFileWriter.discard test");
38 DEFINE_int64(
39     async_discard_messages_per_writer,
40     200000,
41     "number of messages each writer threads should generate in "
42     "the AsyncFileWriter.discard test");
43 DEFINE_int64(
44     async_discard_read_sleep_usec,
45     500,
46     "how long the read thread should sleep between reads in "
47     "the AsyncFileWriter.discard test");
48
49 using namespace folly;
50 using namespace std::literals::chrono_literals;
51 using folly::test::TemporaryFile;
52
53 TEST(AsyncFileWriter, noMessages) {
54   TemporaryFile tmpFile{"logging_test"};
55
56   // Test the simple construction and destruction of an AsyncFileWriter
57   // without ever writing any messages.  This still exercises the I/O
58   // thread start-up and shutdown code.
59   AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
60 }
61
62 TEST(AsyncFileWriter, simpleMessages) {
63   TemporaryFile tmpFile{"logging_test"};
64
65   {
66     AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
67     for (int n = 0; n < 10; ++n) {
68       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
69       sched_yield();
70     }
71   }
72
73   std::string data;
74   auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
75   ASSERT_TRUE(ret);
76
77   std::string expected =
78       "message 0\n"
79       "message 1\n"
80       "message 2\n"
81       "message 3\n"
82       "message 4\n"
83       "message 5\n"
84       "message 6\n"
85       "message 7\n"
86       "message 8\n"
87       "message 9\n";
88   EXPECT_EQ(expected, data);
89 }
90
91 #ifndef _WIN32
92 namespace {
93 static std::vector<std::string>* internalWarnings;
94
95 void handleLoggingError(
96     StringPiece /* file */,
97     int /* lineNumber */,
98     std::string&& msg) {
99   internalWarnings->emplace_back(std::move(msg));
100 }
101 }
102
103 TEST(AsyncFileWriter, ioError) {
104   // Set the LoggerDB internal warning handler so we can record the messages
105   std::vector<std::string> logErrors;
106   internalWarnings = &logErrors;
107   LoggerDB::setInternalWarningHandler(handleLoggingError);
108
109   // Create an AsyncFileWriter that refers to a pipe whose read end is closed
110   std::array<int, 2> fds;
111   auto rc = pipe(fds.data());
112   folly::checkUnixError(rc, "failed to create pipe");
113   signal(SIGPIPE, SIG_IGN);
114   ::close(fds[0]);
115
116   // Log a bunch of messages to the writer
117   size_t numMessages = 100;
118   {
119     AsyncFileWriter writer{folly::File{fds[1], true}};
120     for (size_t n = 0; n < numMessages; ++n) {
121       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
122       sched_yield();
123     }
124   }
125
126   LoggerDB::setInternalWarningHandler(nullptr);
127
128   // AsyncFileWriter should have some internal warning messages about the
129   // log failures.  This will generally be many fewer than the number of
130   // messages we wrote, though, since it performs write batching.
131   for (const auto& msg : logErrors) {
132     EXPECT_THAT(
133         msg,
134         testing::ContainsRegex(
135             "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
136   }
137   EXPECT_GT(logErrors.size(), 0);
138   EXPECT_LE(logErrors.size(), numMessages);
139 }
140
141 namespace {
142 size_t fillUpPipe(int fd) {
143   int flags = fcntl(fd, F_GETFL);
144   folly::checkUnixError(flags, "failed get file descriptor flags");
145   auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
146   folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
147   std::vector<char> data;
148   data.resize(4000);
149   size_t totalBytes = 0;
150   size_t bytesToWrite = data.size();
151   while (true) {
152     auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
153     if (bytesWritten < 0) {
154       if (errno == EAGAIN || errno == EWOULDBLOCK) {
155         // We blocked.  Keep trying smaller writes, until we get down to a
156         // single byte, just to make sure the logging code really won't be able
157         // to write anything to the pipe.
158         if (bytesToWrite <= 1) {
159           break;
160         } else {
161           bytesToWrite /= 2;
162         }
163       } else {
164         throwSystemError("error writing to pipe");
165       }
166     } else {
167       totalBytes += bytesWritten;
168     }
169   }
170   fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
171
172   rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
173   folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
174
175   return totalBytes;
176 }
177 }
178
179 TEST(AsyncFileWriter, flush) {
180   // Set up a pipe(), then write data to the write endpoint until it fills up
181   // and starts blocking.
182   std::array<int, 2> fds;
183   auto rc = pipe(fds.data());
184   folly::checkUnixError(rc, "failed to create pipe");
185   File readPipe{fds[0], true};
186   File writePipe{fds[1], true};
187
188   auto paddingSize = fillUpPipe(writePipe.fd());
189
190   // Now set up an AsyncFileWriter pointing at the write end of the pipe
191   AsyncFileWriter writer{std::move(writePipe)};
192
193   // Write a message
194   writer.writeMessage(std::string{"test message"});
195
196   // Call flush().  Use a separate thread, since this should block until we
197   // consume data from the pipe.
198   Promise<Unit> promise;
199   auto future = promise.getFuture();
200   auto flushFunction = [&] { writer.flush(); };
201   std::thread flushThread{
202       [&]() { promise.setTry(makeTryWith(flushFunction)); }};
203
204   // Sleep briefly, and make sure flush() still hasn't completed.
205   /* sleep override */
206   std::this_thread::sleep_for(10ms);
207   EXPECT_FALSE(future.isReady());
208
209   // Now read from the pipe
210   std::vector<char> buf;
211   buf.resize(paddingSize);
212   readFull(readPipe.fd(), buf.data(), buf.size());
213
214   // Make sure flush completes successfully now
215   future.get(10ms);
216   flushThread.join();
217 }
218 #endif
219
220 /**
221  * writeThread() writes a series of messages to the AsyncFileWriter
222  */
223 void writeThread(
224     AsyncFileWriter* writer,
225     size_t id,
226     size_t numMessages,
227     uint32_t flags) {
228   for (size_t n = 0; n < numMessages; ++n) {
229     writer->writeMessage(
230         folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
231   }
232 }
233
234 class ReadStats {
235  public:
236   ReadStats()
237       : readSleepUS_{static_cast<uint64_t>(
238             std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
239
240   void clearSleepDuration() {
241     readSleepUS_.store(0);
242   }
243   std::chrono::microseconds getSleepUS() const {
244     return std::chrono::microseconds{readSleepUS_.load()};
245   }
246
247   void check(size_t numThreads, size_t messagesPerThread) {
248     EXPECT_EQ("", trailingData_);
249     EXPECT_EQ(numThreads, writers_.size());
250     size_t totalMessagesReceived = 0;
251     for (const auto& writerData : writers_) {
252       EXPECT_LE(writerData.numMessages, messagesPerThread);
253       EXPECT_LE(writerData.lastId, messagesPerThread);
254       totalMessagesReceived += writerData.numMessages;
255     }
256
257     EXPECT_EQ(0, numUnableToParse_);
258     EXPECT_EQ(0, numOutOfOrder_);
259     EXPECT_EQ(
260         numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
261   }
262
263   /**
264    * Check that no messages were dropped from the specified thread.
265    */
266   void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
267     EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
268     EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
269   }
270
271   void messageReceived(StringPiece msg) {
272     if (msg.endsWith(" log messages discarded: "
273                      "logging faster than we can write")) {
274       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
275       fprintf(stderr, "received discard notification: %zu\n", discardCount);
276       numDiscarded_ += discardCount;
277       return;
278     }
279
280     size_t threadID = 0;
281     size_t messageIndex = 0;
282     try {
283       parseMessage(msg, &threadID, &messageIndex);
284     } catch (const std::exception& ex) {
285       ++numUnableToParse_;
286       fprintf(
287           stderr,
288           "unable to parse log message: %s\n",
289           folly::humanify(msg.str()).c_str());
290       return;
291     }
292
293     if (threadID >= writers_.size()) {
294       writers_.resize(threadID + 1);
295     }
296     writers_[threadID].numMessages++;
297     if (messageIndex > writers_[threadID].lastId) {
298       writers_[threadID].lastId = messageIndex;
299     } else {
300       ++numOutOfOrder_;
301       fprintf(
302           stderr,
303           "received out-of-order messages from writer %zu: "
304           "%zu received after %zu\n",
305           threadID,
306           messageIndex,
307           writers_[threadID].lastId);
308     }
309   }
310
311   void trailingData(StringPiece data) {
312     trailingData_ = data.str();
313   }
314
315  private:
316   struct WriterStats {
317     size_t numMessages{0};
318     size_t lastId{0};
319   };
320
321   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
322     constexpr StringPiece prefix{"thread "};
323     constexpr StringPiece middle{" message "};
324     if (!msg.startsWith(prefix)) {
325       throw std::runtime_error("bad message prefix");
326     }
327
328     auto idx = prefix.size();
329     auto end = msg.find(' ', idx);
330     if (end == StringPiece::npos) {
331       throw std::runtime_error("no middle found");
332     }
333
334     *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
335     auto rest = msg.subpiece(end);
336     if (!rest.startsWith(middle)) {
337       throw std::runtime_error("bad message middle");
338     }
339
340     rest.advance(middle.size());
341     *messageIndex = folly::to<size_t>(rest);
342   }
343
344   std::vector<WriterStats> writers_;
345   std::string trailingData_;
346   size_t numUnableToParse_{0};
347   size_t numOutOfOrder_{0};
348   size_t numDiscarded_{0};
349
350   std::atomic<uint64_t> readSleepUS_;
351 };
352
353 /**
354  * readThread() reads messages slowly from a pipe.  This helps test the
355  * AsyncFileWriter behavior when I/O is slow.
356  */
357 void readThread(folly::File&& file, ReadStats* stats) {
358   std::vector<char> buffer;
359   buffer.resize(1024);
360
361   size_t bufferIdx = 0;
362   while (true) {
363     /* sleep override */
364     std::this_thread::sleep_for(stats->getSleepUS());
365
366     auto readResult = folly::readNoInt(
367         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
368     if (readResult < 0) {
369       fprintf(stderr, "error reading from pipe: %d\n", errno);
370       return;
371     }
372     if (readResult == 0) {
373       fprintf(stderr, "read EOF\n");
374       break;
375     }
376
377     auto logDataLen = bufferIdx + readResult;
378     StringPiece logData{buffer.data(), logDataLen};
379     auto idx = 0;
380     while (true) {
381       auto end = logData.find('\n', idx);
382       if (end == StringPiece::npos) {
383         bufferIdx = logDataLen - idx;
384         memmove(buffer.data(), buffer.data() + idx, bufferIdx);
385         break;
386       }
387
388       StringPiece logMsg{logData.data() + idx, end - idx};
389       stats->messageReceived(logMsg);
390       idx = end + 1;
391     }
392   }
393
394   if (bufferIdx != 0) {
395     stats->trailingData(StringPiece{buffer.data(), bufferIdx});
396   }
397 }
398
399 /*
400  * The discard test spawns a number of threads that each write a large number
401  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
402  * thread reads from it slowly, causing a backlog to build up.
403  *
404  * The test then checks that:
405  * - The read thread always receives full messages (no partial log messages)
406  * - Messages that are received are received in order
407  * - The number of messages received plus the number reported in discard
408  *   notifications matches the number of messages sent.
409  */
410 TEST(AsyncFileWriter, discard) {
411   std::array<int, 2> fds;
412   auto pipeResult = pipe(fds.data());
413   folly::checkUnixError(pipeResult, "pipe failed");
414   folly::File readPipe{fds[0], true};
415   folly::File writePipe{fds[1], true};
416
417   // This test should always be run with at least 2 writer threads.
418   // The last one will use the NEVER_DISCARD flag, and we want at least
419   // one that does discard messages.
420   ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
421
422   ReadStats readStats;
423   std::thread reader(readThread, std::move(readPipe), &readStats);
424   {
425     AsyncFileWriter writer{std::move(writePipe)};
426
427     std::vector<std::thread> writeThreads;
428     for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
429       uint32_t flags = 0;
430       // Configure the last writer thread to never drop messages
431       if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
432         flags = LogWriter::NEVER_DISCARD;
433       }
434
435       writeThreads.emplace_back(
436           writeThread,
437           &writer,
438           n,
439           FLAGS_async_discard_messages_per_writer,
440           flags);
441     }
442
443     for (auto& t : writeThreads) {
444       t.join();
445     }
446     fprintf(stderr, "writers done\n");
447   }
448   // Clear the read sleep duration so the reader will finish quickly now
449   readStats.clearSleepDuration();
450   reader.join();
451   readStats.check(
452       FLAGS_async_discard_num_writer_threads,
453       FLAGS_async_discard_messages_per_writer);
454   // Check that no messages were dropped from the thread using the
455   // NEVER_DISCARD flag.
456   readStats.checkNoDrops(
457       FLAGS_async_discard_num_writer_threads - 1,
458       FLAGS_async_discard_messages_per_writer);
459 }