"the AsyncFileWriter.discard test");
using namespace folly;
+using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
TEST(AsyncFileWriter, noMessages) {
/**
* writeThread() writes a series of messages to the AsyncFileWriter
*/
-void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
+void writeThread(
+ AsyncFileWriter* writer,
+ size_t id,
+ size_t numMessages,
+ uint32_t flags) {
for (size_t n = 0; n < numMessages; ++n) {
writer->writeMessage(
- folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
+ folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
}
}
class ReadStats {
public:
+ ReadStats()
+ : readSleepUS_{static_cast<uint64_t>(
+ std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
+
+ void clearSleepDuration() {
+ readSleepUS_.store(0);
+ }
+ std::chrono::microseconds getSleepUS() const {
+ return std::chrono::microseconds{readSleepUS_.load()};
+ }
+
void check(size_t numThreads, size_t messagesPerThread) {
EXPECT_EQ("", trailingData_);
EXPECT_EQ(numThreads, writers_.size());
numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
}
+ /**
+ * Check that no messages were dropped from the specified thread.
+ */
+ void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
+ EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
+ EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+ }
+
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
+
+ std::atomic<uint64_t> readSleepUS_;
};
/**
size_t bufferIdx = 0;
while (true) {
/* sleep override */
- usleep(FLAGS_async_discard_read_sleep_usec);
+ usleep(stats->getSleepUS().count());
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
+ // This test should always be run with at least 2 writer threads.
+ // The last one will use the NEVER_DISCARD flag, and we want at least
+ // one that does discard messages.
+ ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
+
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
std::vector<std::thread> writeThreads;
for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+ uint32_t flags = 0;
+ // Configure the last writer thread to never drop messages
+ if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+ flags = LogWriter::NEVER_DISCARD;
+ }
+
writeThreads.emplace_back(
- writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
+ writeThread,
+ &writer,
+ n,
+ FLAGS_async_discard_messages_per_writer,
+ flags);
}
for (auto& t : writeThreads) {
}
fprintf(stderr, "writers done\n");
}
+ // Clear the read sleep duration so the reader will finish quickly now
+ readStats.clearSleepDuration();
reader.join();
readStats.check(
FLAGS_async_discard_num_writer_threads,
FLAGS_async_discard_messages_per_writer);
+ // Check that no messages were dropped from the thread using the
+ // NEVER_DISCARD flag.
+ readStats.checkNoDrops(
+ FLAGS_async_discard_num_writer_threads - 1,
+ FLAGS_async_discard_messages_per_writer);
}