Summary:
Add a flag to the LogWriter so we can ensure that particular messages are never
discarded, even when the LogWriter is throttling messages.
This functionality will be necessary to implement `FB_LOG(FATAL)` to that we
can avoid discarding the reason for crashing.
Reviewed By: wez
Differential Revision:
D5189498
fbshipit-source-id:
dc4322ea5ba449a341cdbdc32afb0ed466019801
-void AsyncFileWriter::writeMessage(StringPiece buffer) {
- return writeMessage(buffer.str());
+void AsyncFileWriter::writeMessage(StringPiece buffer, uint32_t flags) {
+ return writeMessage(buffer.str(), flags);
-void AsyncFileWriter::writeMessage(std::string&& buffer) {
+void AsyncFileWriter::writeMessage(std::string&& buffer, uint32_t flags) {
auto data = data_.lock();
auto data = data_.lock();
- if (data->currentBufferSize >= data->maxBufferBytes) {
+ if ((data->currentBufferSize >= data->maxBufferBytes) &&
+ !(flags & NEVER_DISCARD)) {
++data->numDiscarded;
return;
}
++data->numDiscarded;
return;
}
- void writeMessage(folly::StringPiece buffer) override;
- void writeMessage(std::string&& buffer) override;
+ void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override;
+ void writeMessage(std::string&& buffer, uint32_t flags = 0) override;
/**
* Block until the I/O thread has finished writing all messages that
/**
* Block until the I/O thread has finished writing all messages that
ImmediateFileWriter::ImmediateFileWriter(folly::File&& file)
: file_{std::move(file)} {}
ImmediateFileWriter::ImmediateFileWriter(folly::File&& file)
: file_{std::move(file)} {}
-void ImmediateFileWriter::writeMessage(StringPiece buffer) {
+void ImmediateFileWriter::writeMessage(
+ StringPiece buffer,
+ uint32_t /* flags */) {
// Write the data.
// We are doing direct file descriptor writes here, so there is no buffering
// of log message data. Each message is immediately written to the output.
// Write the data.
// We are doing direct file descriptor writes here, so there is no buffering
// of log message data. Each message is immediately written to the output.
explicit ImmediateFileWriter(folly::File&& file);
using LogWriter::writeMessage;
explicit ImmediateFileWriter(folly::File&& file);
using LogWriter::writeMessage;
- void writeMessage(folly::StringPiece buffer) override;
+ void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override;
private:
ImmediateFileWriter(ImmediateFileWriter const&) = delete;
private:
ImmediateFileWriter(ImmediateFileWriter const&) = delete;
*/
class LogWriter {
public:
*/
class LogWriter {
public:
+ /**
+ * Bit flag values for use with writeMessage()
+ */
+ enum Flags : uint32_t {
+ NO_FLAGS = 0x00,
+ /**
+ * Ensure that this log message never gets discarded.
+ *
+ * Some LogWriter implementations may discard messages when messages are
+ * being received faster than they can be written. This flag ensures that
+ * this message will never be discarded.
+ *
+ * This flag is used to ensure that LOG(FATAL) messages never get
+ * discarded, so we always report the reason for a crash.
+ */
+ NEVER_DISCARD = 0x01,
+ };
+
virtual ~LogWriter() {}
/**
* Write a serialized log message.
virtual ~LogWriter() {}
/**
* Write a serialized log message.
+ *
+ * The flags parameter is a bitwise-ORed set of Flag values defined above.
- virtual void writeMessage(folly::StringPiece buffer) = 0;
+ virtual void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) = 0;
/**
* Write a serialized message.
/**
* Write a serialized message.
* writeMessage(), but subclasses may override this implementation if
* desired.
*/
* writeMessage(), but subclasses may override this implementation if
* desired.
*/
- virtual void writeMessage(std::string&& buffer) {
- writeMessage(folly::StringPiece{buffer});
+ virtual void writeMessage(std::string&& buffer, uint32_t flags = 0) {
+ writeMessage(folly::StringPiece{buffer}, flags);
"the AsyncFileWriter.discard test");
using namespace folly;
"the AsyncFileWriter.discard test");
using namespace folly;
+using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
TEST(AsyncFileWriter, noMessages) {
using folly::test::TemporaryFile;
TEST(AsyncFileWriter, noMessages) {
/**
* writeThread() writes a series of messages to the AsyncFileWriter
*/
/**
* writeThread() writes a series of messages to the AsyncFileWriter
*/
-void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
+void writeThread(
+ AsyncFileWriter* writer,
+ size_t id,
+ size_t numMessages,
+ uint32_t flags) {
for (size_t n = 0; n < numMessages; ++n) {
writer->writeMessage(
for (size_t n = 0; n < numMessages; ++n) {
writer->writeMessage(
- folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
+ folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
}
}
class ReadStats {
public:
}
}
class ReadStats {
public:
+ ReadStats()
+ : readSleepUS_{static_cast<uint64_t>(
+ std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
+
+ void clearSleepDuration() {
+ readSleepUS_.store(0);
+ }
+ std::chrono::microseconds getSleepUS() const {
+ return std::chrono::microseconds{readSleepUS_.load()};
+ }
+
void check(size_t numThreads, size_t messagesPerThread) {
EXPECT_EQ("", trailingData_);
EXPECT_EQ(numThreads, writers_.size());
void check(size_t numThreads, size_t messagesPerThread) {
EXPECT_EQ("", trailingData_);
EXPECT_EQ(numThreads, writers_.size());
numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
}
numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
}
+ /**
+ * Check that no messages were dropped from the specified thread.
+ */
+ void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
+ EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
+ EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+ }
+
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
+
+ std::atomic<uint64_t> readSleepUS_;
size_t bufferIdx = 0;
while (true) {
/* sleep override */
size_t bufferIdx = 0;
while (true) {
/* sleep override */
- usleep(FLAGS_async_discard_read_sleep_usec);
+ usleep(stats->getSleepUS().count());
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
+ // This test should always be run with at least 2 writer threads.
+ // The last one will use the NEVER_DISCARD flag, and we want at least
+ // one that does discard messages.
+ ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
+
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
std::vector<std::thread> writeThreads;
for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
std::vector<std::thread> writeThreads;
for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+ uint32_t flags = 0;
+ // Configure the last writer thread to never drop messages
+ if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+ flags = LogWriter::NEVER_DISCARD;
+ }
+
writeThreads.emplace_back(
writeThreads.emplace_back(
- writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
+ writeThread,
+ &writer,
+ n,
+ FLAGS_async_discard_messages_per_writer,
+ flags);
}
for (auto& t : writeThreads) {
}
for (auto& t : writeThreads) {
}
fprintf(stderr, "writers done\n");
}
}
fprintf(stderr, "writers done\n");
}
+ // Clear the read sleep duration so the reader will finish quickly now
+ readStats.clearSleepDuration();
reader.join();
readStats.check(
FLAGS_async_discard_num_writer_threads,
FLAGS_async_discard_messages_per_writer);
reader.join();
readStats.check(
FLAGS_async_discard_num_writer_threads,
FLAGS_async_discard_messages_per_writer);
+ // Check that no messages were dropped from the thread using the
+ // NEVER_DISCARD flag.
+ readStats.checkNoDrops(
+ FLAGS_async_discard_num_writer_threads - 1,
+ FLAGS_async_discard_messages_per_writer);
class TestLogWriter : public LogWriter {
public:
class TestLogWriter : public LogWriter {
public:
- void writeMessage(folly::StringPiece buffer) override {
+ void writeMessage(folly::StringPiece buffer, uint32_t /* flags */ = 0)
+ override {
messages_.emplace_back(buffer.str());
}
messages_.emplace_back(buffer.str());
}