1f98a4219db73298eb4eb37bbde31edd5ab5b4d0
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/Conv.h>
17 #include <folly/Exception.h>
18 #include <folly/File.h>
19 #include <folly/FileUtil.h>
20 #include <folly/String.h>
21 #include <folly/experimental/TestUtil.h>
22 #include <folly/experimental/logging/AsyncFileWriter.h>
23 #include <folly/experimental/logging/LoggerDB.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Unistd.h>
28
29 DEFINE_int64(
30     async_discard_num_writer_threads,
31     32,
32     "number of threads to use to generate log messages during "
33     "the AsyncFileWriter.discard test");
34 DEFINE_int64(
35     async_discard_messages_per_writer,
36     200000,
37     "number of messages each writer threads should generate in "
38     "the AsyncFileWriter.discard test");
39 DEFINE_int64(
40     async_discard_read_sleep_usec,
41     500,
42     "how long the read thread should sleep between reads in "
43     "the AsyncFileWriter.discard test");
44
45 using namespace folly;
46 using namespace std::literals::chrono_literals;
47 using folly::test::TemporaryFile;
48
49 TEST(AsyncFileWriter, noMessages) {
50   TemporaryFile tmpFile{"logging_test"};
51
52   // Test the simple construction and destruction of an AsyncFileWriter
53   // without ever writing any messages.  This still exercises the I/O
54   // thread start-up and shutdown code.
55   AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
56 }
57
58 TEST(AsyncFileWriter, simpleMessages) {
59   TemporaryFile tmpFile{"logging_test"};
60
61   {
62     AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
63     for (int n = 0; n < 10; ++n) {
64       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
65       sched_yield();
66     }
67   }
68
69   std::string data;
70   auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
71   ASSERT_TRUE(ret);
72
73   std::string expected =
74       "message 0\n"
75       "message 1\n"
76       "message 2\n"
77       "message 3\n"
78       "message 4\n"
79       "message 5\n"
80       "message 6\n"
81       "message 7\n"
82       "message 8\n"
83       "message 9\n";
84   EXPECT_EQ(expected, data);
85 }
86
87 #ifndef _WIN32
88 namespace {
89 static std::vector<std::string>* internalWarnings;
90
91 void handleLoggingError(
92     StringPiece /* file */,
93     int /* lineNumber */,
94     std::string&& msg) {
95   internalWarnings->emplace_back(std::move(msg));
96 }
97 }
98
99 TEST(AsyncFileWriter, ioError) {
100   // Set the LoggerDB internal warning handler so we can record the messages
101   std::vector<std::string> logErrors;
102   internalWarnings = &logErrors;
103   LoggerDB::setInternalWarningHandler(handleLoggingError);
104
105   // Create an AsyncFileWriter that refers to a pipe whose read end is closed
106   std::array<int, 2> fds;
107   auto rc = pipe(fds.data());
108   folly::checkUnixError(rc, "failed to create pipe");
109   signal(SIGPIPE, SIG_IGN);
110   ::close(fds[0]);
111
112   // Log a bunch of messages to the writer
113   size_t numMessages = 100;
114   {
115     AsyncFileWriter writer{folly::File{fds[1], true}};
116     for (size_t n = 0; n < numMessages; ++n) {
117       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
118       sched_yield();
119     }
120   }
121
122   LoggerDB::setInternalWarningHandler(nullptr);
123
124   // AsyncFileWriter should have some internal warning messages about the
125   // log failures.  This will generally be many fewer than the number of
126   // messages we wrote, though, since it performs write batching.
127   for (const auto& msg : logErrors) {
128     EXPECT_THAT(
129         msg,
130         testing::ContainsRegex(
131             "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
132   }
133   EXPECT_GT(logErrors.size(), 0);
134   EXPECT_LE(logErrors.size(), numMessages);
135 }
136 #endif
137
138 /**
139  * writeThread() writes a series of messages to the AsyncFileWriter
140  */
141 void writeThread(
142     AsyncFileWriter* writer,
143     size_t id,
144     size_t numMessages,
145     uint32_t flags) {
146   for (size_t n = 0; n < numMessages; ++n) {
147     writer->writeMessage(
148         folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
149   }
150 }
151
152 class ReadStats {
153  public:
154   ReadStats()
155       : readSleepUS_{static_cast<uint64_t>(
156             std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
157
158   void clearSleepDuration() {
159     readSleepUS_.store(0);
160   }
161   std::chrono::microseconds getSleepUS() const {
162     return std::chrono::microseconds{readSleepUS_.load()};
163   }
164
165   void check(size_t numThreads, size_t messagesPerThread) {
166     EXPECT_EQ("", trailingData_);
167     EXPECT_EQ(numThreads, writers_.size());
168     size_t totalMessagesReceived = 0;
169     for (const auto& writerData : writers_) {
170       EXPECT_LE(writerData.numMessages, messagesPerThread);
171       EXPECT_LE(writerData.lastId, messagesPerThread);
172       totalMessagesReceived += writerData.numMessages;
173     }
174
175     EXPECT_EQ(0, numUnableToParse_);
176     EXPECT_EQ(0, numOutOfOrder_);
177     EXPECT_EQ(
178         numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
179   }
180
181   /**
182    * Check that no messages were dropped from the specified thread.
183    */
184   void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
185     EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
186     EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
187   }
188
189   void messageReceived(StringPiece msg) {
190     if (msg.endsWith(" log messages discarded: "
191                      "logging faster than we can write")) {
192       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
193       fprintf(stderr, "received discard notification: %zu\n", discardCount);
194       numDiscarded_ += discardCount;
195       return;
196     }
197
198     size_t threadID = 0;
199     size_t messageIndex = 0;
200     try {
201       parseMessage(msg, &threadID, &messageIndex);
202     } catch (const std::exception& ex) {
203       ++numUnableToParse_;
204       fprintf(
205           stderr,
206           "unable to parse log message: %s\n",
207           folly::humanify(msg.str()).c_str());
208       return;
209     }
210
211     if (threadID >= writers_.size()) {
212       writers_.resize(threadID + 1);
213     }
214     writers_[threadID].numMessages++;
215     if (messageIndex > writers_[threadID].lastId) {
216       writers_[threadID].lastId = messageIndex;
217     } else {
218       ++numOutOfOrder_;
219       fprintf(
220           stderr,
221           "received out-of-order messages from writer %zu: "
222           "%zu received after %zu\n",
223           threadID,
224           messageIndex,
225           writers_[threadID].lastId);
226     }
227   }
228
229   void trailingData(StringPiece data) {
230     trailingData_ = data.str();
231   }
232
233  private:
234   struct WriterStats {
235     size_t numMessages{0};
236     size_t lastId{0};
237   };
238
239   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
240     constexpr StringPiece prefix{"thread "};
241     constexpr StringPiece middle{" message "};
242     if (!msg.startsWith(prefix)) {
243       throw std::runtime_error("bad message prefix");
244     }
245
246     auto idx = prefix.size();
247     auto end = msg.find(' ', idx);
248     if (end == StringPiece::npos) {
249       throw std::runtime_error("no middle found");
250     }
251
252     *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
253     auto rest = msg.subpiece(end);
254     if (!rest.startsWith(middle)) {
255       throw std::runtime_error("bad message middle");
256     }
257
258     rest.advance(middle.size());
259     *messageIndex = folly::to<size_t>(rest);
260   }
261
262   std::vector<WriterStats> writers_;
263   std::string trailingData_;
264   size_t numUnableToParse_{0};
265   size_t numOutOfOrder_{0};
266   size_t numDiscarded_{0};
267
268   std::atomic<uint64_t> readSleepUS_;
269 };
270
271 /**
272  * readThread() reads messages slowly from a pipe.  This helps test the
273  * AsyncFileWriter behavior when I/O is slow.
274  */
275 void readThread(folly::File&& file, ReadStats* stats) {
276   std::vector<char> buffer;
277   buffer.resize(1024);
278
279   size_t bufferIdx = 0;
280   while (true) {
281     /* sleep override */
282     usleep(stats->getSleepUS().count());
283
284     auto readResult = folly::readNoInt(
285         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
286     if (readResult < 0) {
287       fprintf(stderr, "error reading from pipe: %d\n", errno);
288       return;
289     }
290     if (readResult == 0) {
291       fprintf(stderr, "read EOF\n");
292       break;
293     }
294
295     auto logDataLen = bufferIdx + readResult;
296     StringPiece logData{buffer.data(), logDataLen};
297     auto idx = 0;
298     while (true) {
299       auto end = logData.find('\n', idx);
300       if (end == StringPiece::npos) {
301         bufferIdx = logDataLen - idx;
302         memmove(buffer.data(), buffer.data() + idx, bufferIdx);
303         break;
304       }
305
306       StringPiece logMsg{logData.data() + idx, end - idx};
307       stats->messageReceived(logMsg);
308       idx = end + 1;
309     }
310   }
311
312   if (bufferIdx != 0) {
313     stats->trailingData(StringPiece{buffer.data(), bufferIdx});
314   }
315 }
316
317 /*
318  * The discard test spawns a number of threads that each write a large number
319  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
320  * thread reads from it slowly, causing a backlog to build up.
321  *
322  * The test then checks that:
323  * - The read thread always receives full messages (no partial log messages)
324  * - Messages that are received are received in order
325  * - The number of messages received plus the number reported in discard
326  *   notifications matches the number of messages sent.
327  */
328 TEST(AsyncFileWriter, discard) {
329   std::array<int, 2> fds;
330   auto pipeResult = pipe(fds.data());
331   folly::checkUnixError(pipeResult, "pipe failed");
332   folly::File readPipe{fds[0], true};
333   folly::File writePipe{fds[1], true};
334
335   // This test should always be run with at least 2 writer threads.
336   // The last one will use the NEVER_DISCARD flag, and we want at least
337   // one that does discard messages.
338   ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
339
340   ReadStats readStats;
341   std::thread reader(readThread, std::move(readPipe), &readStats);
342   {
343     AsyncFileWriter writer{std::move(writePipe)};
344
345     std::vector<std::thread> writeThreads;
346     for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
347       uint32_t flags = 0;
348       // Configure the last writer thread to never drop messages
349       if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
350         flags = LogWriter::NEVER_DISCARD;
351       }
352
353       writeThreads.emplace_back(
354           writeThread,
355           &writer,
356           n,
357           FLAGS_async_discard_messages_per_writer,
358           flags);
359     }
360
361     for (auto& t : writeThreads) {
362       t.join();
363     }
364     fprintf(stderr, "writers done\n");
365   }
366   // Clear the read sleep duration so the reader will finish quickly now
367   readStats.clearSleepDuration();
368   reader.join();
369   readStats.check(
370       FLAGS_async_discard_num_writer_threads,
371       FLAGS_async_discard_messages_per_writer);
372   // Check that no messages were dropped from the thread using the
373   // NEVER_DISCARD flag.
374   readStats.checkNoDrops(
375       FLAGS_async_discard_num_writer_threads - 1,
376       FLAGS_async_discard_messages_per_writer);
377 }