logging: add AsyncFileWriter
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/Conv.h>
17 #include <folly/Exception.h>
18 #include <folly/File.h>
19 #include <folly/FileUtil.h>
20 #include <folly/String.h>
21 #include <folly/experimental/TestUtil.h>
22 #include <folly/experimental/logging/AsyncFileWriter.h>
23 #include <folly/experimental/logging/LoggerDB.h>
24 #include <folly/portability/GFlags.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Unistd.h>
28
29 DEFINE_int64(
30     async_discard_num_writer_threads,
31     32,
32     "number of threads to use to generate log messages during "
33     "the AsyncFileWriter.discard test");
34 DEFINE_int64(
35     async_discard_messages_per_writer,
36     200000,
37     "number of messages each writer threads should generate in "
38     "the AsyncFileWriter.discard test");
39 DEFINE_int64(
40     async_discard_read_sleep_usec,
41     500,
42     "how long the read thread should sleep between reads in "
43     "the AsyncFileWriter.discard test");
44
45 using namespace folly;
46 using folly::test::TemporaryFile;
47
48 TEST(AsyncFileWriter, noMessages) {
49   TemporaryFile tmpFile{"logging_test"};
50
51   // Test the simple construction and destruction of an AsyncFileWriter
52   // without ever writing any messages.  This still exercises the I/O
53   // thread start-up and shutdown code.
54   AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
55 }
56
57 TEST(AsyncFileWriter, simpleMessages) {
58   TemporaryFile tmpFile{"logging_test"};
59
60   {
61     AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
62     for (int n = 0; n < 10; ++n) {
63       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
64       sched_yield();
65     }
66   }
67
68   std::string data;
69   auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
70   ASSERT_TRUE(ret);
71
72   std::string expected =
73       "message 0\n"
74       "message 1\n"
75       "message 2\n"
76       "message 3\n"
77       "message 4\n"
78       "message 5\n"
79       "message 6\n"
80       "message 7\n"
81       "message 8\n"
82       "message 9\n";
83   EXPECT_EQ(expected, data);
84 }
85
86 #ifndef _WIN32
87 namespace {
88 static std::vector<std::string>* internalWarnings;
89
90 void handleLoggingError(
91     StringPiece /* file */,
92     int /* lineNumber */,
93     std::string&& msg) {
94   internalWarnings->emplace_back(std::move(msg));
95 }
96 }
97
98 TEST(AsyncFileWriter, ioError) {
99   // Set the LoggerDB internal warning handler so we can record the messages
100   std::vector<std::string> logErrors;
101   internalWarnings = &logErrors;
102   LoggerDB::setInternalWarningHandler(handleLoggingError);
103
104   // Create an AsyncFileWriter that refers to a pipe whose read end is closed
105   std::array<int, 2> fds;
106   auto rc = pipe(fds.data());
107   folly::checkUnixError(rc, "failed to create pipe");
108   signal(SIGPIPE, SIG_IGN);
109   ::close(fds[0]);
110
111   // Log a bunch of messages to the writer
112   size_t numMessages = 100;
113   {
114     AsyncFileWriter writer{folly::File{fds[1], true}};
115     for (size_t n = 0; n < numMessages; ++n) {
116       writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
117       sched_yield();
118     }
119   }
120
121   LoggerDB::setInternalWarningHandler(nullptr);
122
123   // AsyncFileWriter should have some internal warning messages about the
124   // log failures.  This will generally be many fewer than the number of
125   // messages we wrote, though, since it performs write batching.
126   for (const auto& msg : logErrors) {
127     EXPECT_THAT(
128         msg,
129         testing::ContainsRegex(
130             "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
131   }
132   EXPECT_GT(logErrors.size(), 0);
133   EXPECT_LE(logErrors.size(), numMessages);
134 }
135 #endif
136
137 /**
138  * writeThread() writes a series of messages to the AsyncFileWriter
139  */
140 void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
141   for (size_t n = 0; n < numMessages; ++n) {
142     writer->writeMessage(
143         folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
144   }
145 }
146
147 class ReadStats {
148  public:
149   void check(size_t numThreads, size_t messagesPerThread) {
150     EXPECT_EQ("", trailingData_);
151     EXPECT_EQ(numThreads, writers_.size());
152     size_t totalMessagesReceived = 0;
153     for (const auto& writerData : writers_) {
154       EXPECT_LE(writerData.numMessages, messagesPerThread);
155       EXPECT_LE(writerData.lastId, messagesPerThread);
156       totalMessagesReceived += writerData.numMessages;
157     }
158
159     EXPECT_EQ(0, numUnableToParse_);
160     EXPECT_EQ(0, numOutOfOrder_);
161     EXPECT_EQ(
162         numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
163   }
164
165   void messageReceived(StringPiece msg) {
166     if (msg.endsWith(" log messages discarded: "
167                      "logging faster than we can write")) {
168       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
169       fprintf(stderr, "received discard notification: %zu\n", discardCount);
170       numDiscarded_ += discardCount;
171       return;
172     }
173
174     size_t threadID = 0;
175     size_t messageIndex = 0;
176     try {
177       parseMessage(msg, &threadID, &messageIndex);
178     } catch (const std::exception& ex) {
179       ++numUnableToParse_;
180       fprintf(
181           stderr,
182           "unable to parse log message: %s\n",
183           folly::humanify(msg.str()).c_str());
184       return;
185     }
186
187     if (threadID >= writers_.size()) {
188       writers_.resize(threadID + 1);
189     }
190     writers_[threadID].numMessages++;
191     if (messageIndex > writers_[threadID].lastId) {
192       writers_[threadID].lastId = messageIndex;
193     } else {
194       ++numOutOfOrder_;
195       fprintf(
196           stderr,
197           "received out-of-order messages from writer %zu: "
198           "%zu received after %zu\n",
199           threadID,
200           messageIndex,
201           writers_[threadID].lastId);
202     }
203   }
204
205   void trailingData(StringPiece data) {
206     trailingData_ = data.str();
207   }
208
209  private:
210   struct WriterStats {
211     size_t numMessages{0};
212     size_t lastId{0};
213   };
214
215   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
216     constexpr StringPiece prefix{"thread "};
217     constexpr StringPiece middle{" message "};
218     if (!msg.startsWith(prefix)) {
219       throw std::runtime_error("bad message prefix");
220     }
221
222     auto idx = prefix.size();
223     auto end = msg.find(' ', idx);
224     if (end == StringPiece::npos) {
225       throw std::runtime_error("no middle found");
226     }
227
228     *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
229     auto rest = msg.subpiece(end);
230     if (!rest.startsWith(middle)) {
231       throw std::runtime_error("bad message middle");
232     }
233
234     rest.advance(middle.size());
235     *messageIndex = folly::to<size_t>(rest);
236   }
237
238   std::vector<WriterStats> writers_;
239   std::string trailingData_;
240   size_t numUnableToParse_{0};
241   size_t numOutOfOrder_{0};
242   size_t numDiscarded_{0};
243 };
244
245 /**
246  * readThread() reads messages slowly from a pipe.  This helps test the
247  * AsyncFileWriter behavior when I/O is slow.
248  */
249 void readThread(folly::File&& file, ReadStats* stats) {
250   std::vector<char> buffer;
251   buffer.resize(1024);
252
253   size_t bufferIdx = 0;
254   while (true) {
255     /* sleep override */
256     usleep(FLAGS_async_discard_read_sleep_usec);
257
258     auto readResult = folly::readNoInt(
259         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
260     if (readResult < 0) {
261       fprintf(stderr, "error reading from pipe: %d\n", errno);
262       return;
263     }
264     if (readResult == 0) {
265       fprintf(stderr, "read EOF\n");
266       break;
267     }
268
269     auto logDataLen = bufferIdx + readResult;
270     StringPiece logData{buffer.data(), logDataLen};
271     auto idx = 0;
272     while (true) {
273       auto end = logData.find('\n', idx);
274       if (end == StringPiece::npos) {
275         bufferIdx = logDataLen - idx;
276         memmove(buffer.data(), buffer.data() + idx, bufferIdx);
277         break;
278       }
279
280       StringPiece logMsg{logData.data() + idx, end - idx};
281       stats->messageReceived(logMsg);
282       idx = end + 1;
283     }
284   }
285
286   if (bufferIdx != 0) {
287     stats->trailingData(StringPiece{buffer.data(), bufferIdx});
288   }
289 }
290
291 /*
292  * The discard test spawns a number of threads that each write a large number
293  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
294  * thread reads from it slowly, causing a backlog to build up.
295  *
296  * The test then checks that:
297  * - The read thread always receives full messages (no partial log messages)
298  * - Messages that are received are received in order
299  * - The number of messages received plus the number reported in discard
300  *   notifications matches the number of messages sent.
301  */
302 TEST(AsyncFileWriter, discard) {
303   std::array<int, 2> fds;
304   auto pipeResult = pipe(fds.data());
305   folly::checkUnixError(pipeResult, "pipe failed");
306   folly::File readPipe{fds[0], true};
307   folly::File writePipe{fds[1], true};
308
309   ReadStats readStats;
310   std::thread reader(readThread, std::move(readPipe), &readStats);
311   {
312     AsyncFileWriter writer{std::move(writePipe)};
313
314     std::vector<std::thread> writeThreads;
315     for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
316       writeThreads.emplace_back(
317           writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
318     }
319
320     for (auto& t : writeThreads) {
321       t.join();
322     }
323     fprintf(stderr, "writers done\n");
324   }
325   reader.join();
326   readStats.check(
327       FLAGS_async_discard_num_writer_threads,
328       FLAGS_async_discard_messages_per_writer);
329 }