Lift thrift/lib/cpp/test/TNotificationQueueTest.
authorYedidya Feldblum <yfeldblum@fb.com>
Thu, 2 Jul 2015 20:58:58 +0000 (13:58 -0700)
committerSara Golemon <sgolemon@fb.com>
Thu, 2 Jul 2015 22:46:01 +0000 (15:46 -0700)
Summary: [Folly] Lift thrift/lib/cpp/test/TNotificationQueueTest.

`NotificationQueue` is already moved into folly; move its accompanying test suite as well.

Reviewed By: @simpkins

Differential Revision: D2207104

folly/Makefile.am
folly/io/async/NotificationQueue.h
folly/io/async/ScopedEventBaseThread.cpp [new file with mode: 0644]
folly/io/async/ScopedEventBaseThread.h [new file with mode: 0644]
folly/io/async/test/NotificationQueueTest.cpp [new file with mode: 0644]
folly/io/async/test/ScopedEventBaseThreadTest.cpp [new file with mode: 0644]

index f0eaa9c96dbd4e90c385fcd296c404210c1a2881..ce8b6a781734c73e64994aeef92498c8841d675f 100644 (file)
@@ -201,6 +201,7 @@ nobase_follyinclude_HEADERS = \
        io/async/HHWheelTimer.h \
        io/async/Request.h \
        io/async/SSLContext.h \
+       io/async/ScopedEventBaseThread.h \
        io/async/TimeoutManager.h \
        io/async/test/AsyncSSLSocketTest.h \
        io/async/test/BlockingSocket.h \
@@ -385,6 +386,7 @@ libfolly_la_SOURCES = \
        io/async/EventBaseManager.cpp \
        io/async/EventHandler.cpp \
        io/async/SSLContext.cpp \
+       io/async/ScopedEventBaseThread.cpp \
        io/async/HHWheelTimer.cpp \
        io/async/test/TimeUtil.cpp \
        json.cpp \
index 6bdf16fae361fdca48467fb2e0306ffdeaa35e28..46c6a8911492e84e535264742bc4e1bf4c0b3f65 100644 (file)
@@ -120,7 +120,7 @@ class NotificationQueue {
      * @returns true if the queue was drained, false otherwise. In practice,
      * this will only fail if someone else is already draining the queue.
      */
-    bool consumeUntilDrained() noexcept;
+    bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
 
     /**
      * Get the NotificationQueue that this consumer is currently consuming
@@ -165,7 +165,7 @@ class NotificationQueue {
      *
      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
      */
-    void consumeMessages(bool isDrain) noexcept;
+    void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
 
     void setActive(bool active, bool shouldLock = false) {
       if (!queue_) {
@@ -595,11 +595,16 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
 template<typename MessageT>
 void NotificationQueue<MessageT>::Consumer::consumeMessages(
-    bool isDrain) noexcept {
+    bool isDrain, size_t* numConsumed) noexcept {
   uint32_t numProcessed = 0;
   bool firstRun = true;
   setActive(true);
   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
+  SCOPE_EXIT {
+    if (numConsumed != nullptr) {
+      *numConsumed = numProcessed;
+    }
+  };
   while (true) {
     // Try to decrement the eventfd.
     //
@@ -760,7 +765,8 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
 }
 
 template<typename MessageT>
-bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
+    size_t* numConsumed) noexcept {
   {
     folly::SpinLockGuard g(queue_->spinlock_);
     if (queue_->draining_) {
@@ -768,7 +774,7 @@ bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
     }
     queue_->draining_ = true;
   }
-  consumeMessages(true);
+  consumeMessages(true, numConsumed);
   {
     folly::SpinLockGuard g(queue_->spinlock_);
     queue_->draining_ = false;
diff --git a/folly/io/async/ScopedEventBaseThread.cpp b/folly/io/async/ScopedEventBaseThread.cpp
new file mode 100644 (file)
index 0000000..29a4459
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/ScopedEventBaseThread.h>
+
+#include <thread>
+#include <folly/Memory.h>
+
+using namespace std;
+
+namespace folly {
+
+ScopedEventBaseThread::ScopedEventBaseThread(bool autostart) {
+  if (autostart) {
+    start();
+  }
+}
+
+ScopedEventBaseThread::~ScopedEventBaseThread() {
+  stop();
+}
+
+ScopedEventBaseThread::ScopedEventBaseThread(
+    ScopedEventBaseThread&& other) noexcept = default;
+
+ScopedEventBaseThread& ScopedEventBaseThread::operator=(
+    ScopedEventBaseThread&& other) noexcept = default;
+
+void ScopedEventBaseThread::start() {
+  if (running()) {
+    return;
+  }
+  eventBase_ = make_unique<EventBase>();
+  thread_ = make_unique<thread>(&EventBase::loopForever, &*eventBase_);
+  eventBase_->waitUntilRunning();
+}
+
+void ScopedEventBaseThread::stop() {
+  if (!running()) {
+    return;
+  }
+  eventBase_->terminateLoopSoon();
+  thread_->join();
+  eventBase_ = nullptr;
+  thread_ = nullptr;
+}
+
+bool ScopedEventBaseThread::running() {
+  CHECK(bool(eventBase_) == bool(thread_));
+  return eventBase_ && thread_;
+}
+
+}
diff --git a/folly/io/async/ScopedEventBaseThread.h b/folly/io/async/ScopedEventBaseThread.h
new file mode 100644 (file)
index 0000000..60d62d7
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <folly/io/async/EventBase.h>
+
+namespace std {
+class thread;
+}
+
+namespace folly {
+
+/**
+ * A helper class to start a new thread running a TEventBase loop.
+ *
+ * The new thread will be started by the ScopedEventBaseThread constructor.
+ * When the ScopedEventBaseThread object is destroyed, the thread will be
+ * stopped.
+ */
+class ScopedEventBaseThread {
+ public:
+  explicit ScopedEventBaseThread(bool autostart = true);
+  ~ScopedEventBaseThread();
+
+  ScopedEventBaseThread(ScopedEventBaseThread&& other) noexcept;
+  ScopedEventBaseThread &operator=(ScopedEventBaseThread&& other) noexcept;
+
+  /**
+   * Get a pointer to the TEventBase driving this thread.
+   */
+  EventBase* getEventBase() const {
+    return eventBase_.get();
+  }
+
+  void start();
+  void stop();
+  bool running();
+
+ private:
+  ScopedEventBaseThread(const ScopedEventBaseThread& other) = delete;
+  ScopedEventBaseThread& operator=(const ScopedEventBaseThread& other) = delete;
+
+  std::unique_ptr<EventBase> eventBase_;
+  std::unique_ptr<std::thread> thread_;
+};
+
+}
diff --git a/folly/io/async/test/NotificationQueueTest.cpp b/folly/io/async/test/NotificationQueueTest.cpp
new file mode 100644 (file)
index 0000000..c831ee1
--- /dev/null
@@ -0,0 +1,642 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+
+#include <folly/Baton.h>
+#include <list>
+#include <iostream>
+#include <thread>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <gtest/gtest.h>
+
+using namespace std;
+using namespace folly;
+
+typedef NotificationQueue<int> IntQueue;
+
+class QueueConsumer : public IntQueue::Consumer {
+ public:
+  QueueConsumer() {}
+
+  void messageAvailable(int&& value) override {
+    messages.push_back(value);
+    if (fn) {
+      fn(value);
+    }
+  }
+
+  std::function<void(int)> fn;
+  std::deque<int> messages;
+};
+
+class QueueTest {
+ public:
+  explicit QueueTest(uint32_t maxSize = 0,
+                     IntQueue::FdType type = IntQueue::FdType::EVENTFD) :
+      queue(maxSize, type),
+      terminationQueue(maxSize, type)
+    {}
+
+  void sendOne();
+  void putMessages();
+  void multiConsumer();
+  void maxQueueSize();
+  void maxReadAtOnce();
+  void destroyCallback();
+  void useAfterFork();
+
+  IntQueue queue;
+  IntQueue terminationQueue;
+
+};
+
+void QueueTest::sendOne() {
+  // Create a notification queue and a callback in this thread
+  EventBase eventBase;
+
+  QueueConsumer consumer;
+  consumer.fn = [&](int) {
+    // Stop consuming after we receive 1 message
+    consumer.stopConsuming();
+  };
+  consumer.startConsuming(&eventBase, &queue);
+
+  // Start a new EventBase thread to put a message on our queue
+  ScopedEventBaseThread t1;
+  t1.getEventBase()->runInEventBaseThread([&] {
+    queue.putMessage(5);
+  });
+
+  // Loop until we receive the message
+  eventBase.loop();
+
+  const auto& messages = consumer.messages;
+  EXPECT_EQ(1, messages.size());
+  EXPECT_EQ(5, messages.at(0));
+}
+
+void QueueTest::putMessages() {
+  EventBase eventBase;
+
+  QueueConsumer consumer;
+  QueueConsumer consumer2;
+  consumer.fn = [&](int msg) {
+    // Stop consuming after we receive a message with value 0, and start
+    // consumer2
+    if (msg == 0) {
+      consumer.stopConsuming();
+      consumer2.startConsuming(&eventBase, &queue);
+    }
+  };
+  consumer2.fn = [&](int msg) {
+    // Stop consuming after we receive a message with value 0
+    if (msg == 0) {
+      consumer2.stopConsuming();
+    }
+  };
+  consumer.startConsuming(&eventBase, &queue);
+
+  list<int> msgList = { 1, 2, 3, 4 };
+  vector<int> msgVector = { 5, 0, 9, 8, 7, 6, 7, 7,
+                            8, 8, 2, 9, 6, 6, 10, 2, 0 };
+  // Call putMessages() several times to add messages to the queue
+  queue.putMessages(msgList.begin(), msgList.end());
+  queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
+  // Test sending 17 messages, the pipe-based queue calls write in 16 byte
+  // chunks
+  queue.putMessages(msgVector.begin(), msgVector.end());
+
+  // Loop until the consumer has stopped
+  eventBase.loop();
+
+  vector<int> expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 };
+  vector<int> expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 };
+  EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
+  for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
+    EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
+  }
+  EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
+  for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
+    EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
+  }
+}
+
+void QueueTest::multiConsumer() {
+  uint32_t numConsumers = 8;
+  uint32_t numMessages = 10000;
+
+  // Create several consumers each running in their own EventBase thread
+  vector<QueueConsumer> consumers(numConsumers);
+  vector<ScopedEventBaseThread> threads(numConsumers);
+
+  for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
+    QueueConsumer* consumer = &consumers[consumerIdx];
+
+    consumer->fn = [consumer, consumerIdx, this](int value) {
+      // Treat 0 as a signal to stop.
+      if (value == 0) {
+        consumer->stopConsuming();
+        // Put a message on the terminationQueue to indicate we have stopped
+        terminationQueue.putMessage(consumerIdx);
+      }
+    };
+
+    EventBase* eventBase = threads[consumerIdx].getEventBase();
+    eventBase->runInEventBaseThread([eventBase, consumer, this] {
+      consumer->startConsuming(eventBase, &queue);
+    });
+  }
+
+  // Now add a number of messages from this thread
+  // Start at 1 rather than 0, since 0 is the signal to stop.
+  for (uint32_t n = 1; n < numMessages; ++n) {
+    queue.putMessage(n);
+  }
+  // Now add a 0 for each consumer, to signal them to stop
+  for (uint32_t n = 0; n < numConsumers; ++n) {
+    queue.putMessage(0);
+  }
+
+  // Wait until we get notified that all of the consumers have stopped
+  // We use a separate notification queue for this.
+  QueueConsumer terminationConsumer;
+  vector<uint32_t> consumersStopped(numConsumers, 0);
+  uint32_t consumersRemaining = numConsumers;
+  terminationConsumer.fn = [&](int consumerIdx) {
+    --consumersRemaining;
+    if (consumersRemaining == 0) {
+      terminationConsumer.stopConsuming();
+    }
+
+    EXPECT_GE(consumerIdx, 0);
+    EXPECT_LT(consumerIdx, numConsumers);
+    ++consumersStopped[consumerIdx];
+  };
+  EventBase eventBase;
+  terminationConsumer.startConsuming(&eventBase, &terminationQueue);
+  eventBase.loop();
+
+  // Verify that we saw exactly 1 stop message for each consumer
+  for (uint32_t n = 0; n < numConsumers; ++n) {
+    EXPECT_EQ(1, consumersStopped[n]);
+  }
+
+  // Validate that every message sent to the main queue was received exactly
+  // once.
+  vector<int> messageCount(numMessages, 0);
+  for (uint32_t n = 0; n < numConsumers; ++n) {
+    for (int msg : consumers[n].messages) {
+      EXPECT_GE(msg, 0);
+      EXPECT_LT(msg, numMessages);
+      ++messageCount[msg];
+    }
+  }
+
+  // 0 is the signal to stop, and should have been received once by each
+  // consumer
+  EXPECT_EQ(numConsumers, messageCount[0]);
+  // All other messages should have been received exactly once
+  for (uint32_t n = 1; n < numMessages; ++n) {
+    EXPECT_EQ(1, messageCount[n]);
+  }
+}
+
+void QueueTest::maxQueueSize() {
+  // Create a queue with a maximum size of 5, and fill it up
+
+  for (int n = 0; n < 5; ++n) {
+    queue.tryPutMessage(n);
+  }
+
+  // Calling tryPutMessage() now should fail
+  EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
+
+  EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
+  int val = 5;
+  EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
+
+  // Pop a message from the queue
+  int result = -1;
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(0, result);
+
+  // We should be able to write another message now that we popped one off.
+  queue.tryPutMessage(5);
+  // But now we are full again.
+  EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
+  // putMessage() should let us exceed the maximum
+  queue.putMessage(6);
+
+  // Pull another mesage off
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(1, result);
+
+  // tryPutMessage() should still fail since putMessage() actually put us over
+  // the max.
+  EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
+
+  // Pull another message off and try again
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(2, result);
+  queue.tryPutMessage(7);
+
+  // Now pull all the remaining messages off
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(3, result);
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(4, result);
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(5, result);
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(6, result);
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(7, result);
+
+  // There should be no messages left
+  result = -1;
+  EXPECT_TRUE(!queue.tryConsume(result));
+  EXPECT_EQ(-1, result);
+}
+
+
+void QueueTest::maxReadAtOnce() {
+  // Add 100 messages to the queue
+  for (int n = 0; n < 100; ++n) {
+    queue.putMessage(n);
+  }
+
+  EventBase eventBase;
+
+  // Record how many messages were processed each loop iteration.
+  uint32_t messagesThisLoop = 0;
+  std::vector<uint32_t> messagesPerLoop;
+  std::function<void()> loopFinished = [&] {
+    // Record the current number of messages read this loop
+    messagesPerLoop.push_back(messagesThisLoop);
+    // Reset messagesThisLoop to 0 for the next loop
+    messagesThisLoop = 0;
+
+    // To prevent use-after-free bugs when eventBase destructs,
+    // prevent calling runInLoop any more after the test is finished.
+    // 55 == number of times loop should run.
+    if (messagesPerLoop.size() != 55) {
+      // Reschedule ourself to run at the end of the next loop
+      eventBase.runInLoop(loopFinished);
+    }
+  };
+  // Schedule the first call to loopFinished
+  eventBase.runInLoop(loopFinished);
+
+  QueueConsumer consumer;
+  // Read the first 50 messages 10 at a time.
+  consumer.setMaxReadAtOnce(10);
+  consumer.fn = [&](int value) {
+    ++messagesThisLoop;
+    // After 50 messages, drop to reading only 1 message at a time.
+    if (value == 50) {
+      consumer.setMaxReadAtOnce(1);
+    }
+    // Terminate the loop when we reach the end of the messages.
+    if (value == 99) {
+      eventBase.terminateLoopSoon();
+    }
+  };
+  consumer.startConsuming(&eventBase, &queue);
+
+  // Run the event loop until the consumer terminates it
+  eventBase.loop();
+
+  // The consumer should have read all 100 messages in order
+  EXPECT_EQ(100, consumer.messages.size());
+  for (int n = 0; n < 100; ++n) {
+    EXPECT_EQ(n, consumer.messages.at(n));
+  }
+
+  // Currently EventBase happens to still run the loop callbacks even after
+  // terminateLoopSoon() is called.  However, we don't really want to depend on
+  // this behavior.  In case this ever changes in the future, add
+  // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
+  // last loop iteration.
+  if (messagesThisLoop > 0) {
+    messagesPerLoop.push_back(messagesThisLoop);
+    messagesThisLoop = 0;
+  }
+
+  // For the first 5 loops it should have read 10 messages each time.
+  // After that it should have read 1 messages per loop for the next 50 loops.
+  EXPECT_EQ(55, messagesPerLoop.size());
+  for (int n = 0; n < 5; ++n) {
+    EXPECT_EQ(10, messagesPerLoop.at(n));
+  }
+  for (int n = 5; n < 55; ++n) {
+    EXPECT_EQ(1, messagesPerLoop.at(n));
+  }
+}
+
+
+void QueueTest::destroyCallback() {
+  // Rather than using QueueConsumer, define a separate class for the destroy
+  // test.  The DestroyTestConsumer will delete itself inside the
+  // messageAvailable() callback.  With a regular QueueConsumer this would
+  // destroy the std::function object while the function is running, which we
+  // should probably avoid doing.  This uses a pointer to a std::function to
+  // avoid destroying the function object.
+  class DestroyTestConsumer : public IntQueue::Consumer {
+   public:
+    DestroyTestConsumer() {}
+
+    void messageAvailable(int&& value) override {
+      if (fn && *fn) {
+        (*fn)(value);
+      }
+    }
+
+    std::function<void(int)> *fn;
+  };
+
+  EventBase eventBase;
+  // Create a queue and add 2 messages to it
+  queue.putMessage(1);
+  queue.putMessage(2);
+
+  // Create two QueueConsumers allocated on the heap.
+  // Have whichever one gets called first destroy both of the QueueConsumers.
+  // This way one consumer will be destroyed from inside its messageAvailable()
+  // callback, and one consume will be destroyed when it isn't inside
+  // messageAvailable().
+  std::unique_ptr<DestroyTestConsumer> consumer1(new DestroyTestConsumer);
+  std::unique_ptr<DestroyTestConsumer> consumer2(new DestroyTestConsumer);
+  std::function<void(int)> fn = [&](int) {
+    consumer1.reset();
+    consumer2.reset();
+  };
+  consumer1->fn = &fn;
+  consumer2->fn = &fn;
+
+  consumer1->startConsuming(&eventBase, &queue);
+  consumer2->startConsuming(&eventBase, &queue);
+
+  // Run the event loop.
+  eventBase.loop();
+
+  // One of the consumers should have fired, received the message,
+  // then destroyed both consumers.
+  EXPECT_TRUE(!consumer1);
+  EXPECT_TRUE(!consumer2);
+  // One message should be left in the queue
+  int result = 1;
+  EXPECT_TRUE(queue.tryConsume(result));
+  EXPECT_EQ(2, result);
+}
+
+TEST(NotificationQueueTest, ConsumeUntilDrained) {
+  // Basic tests: make sure we
+  // - drain all the messages
+  // - ignore any maxReadAtOnce
+  // - can't add messages during draining
+  EventBase eventBase;
+  IntQueue queue;
+  QueueConsumer consumer;
+  consumer.fn = [&](int i) {
+    EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
+    EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
+    EXPECT_THROW(queue.putMessage(i), std::runtime_error);
+    std::vector<int> ints{1, 2, 3};
+    EXPECT_THROW(
+        queue.putMessages(ints.begin(), ints.end()),
+        std::runtime_error);
+  };
+  consumer.setMaxReadAtOnce(10); // We should ignore this
+  consumer.startConsuming(&eventBase, &queue);
+  for (int i = 0; i < 20; i++) {
+    queue.putMessage(i);
+  }
+  EXPECT_TRUE(consumer.consumeUntilDrained());
+  EXPECT_EQ(20, consumer.messages.size());
+
+  // Make sure there can only be one drainer at once
+  folly::Baton<> callbackBaton, threadStartBaton;
+  consumer.fn = [&](int i) {
+    callbackBaton.wait();
+  };
+  QueueConsumer competingConsumer;
+  competingConsumer.startConsuming(&eventBase, &queue);
+  queue.putMessage(1);
+  atomic<bool> raceA {false};
+  atomic<bool> raceB {false};
+  size_t numConsA = 0;
+  size_t numConsB = 0;
+  auto thread = std::thread([&]{
+    threadStartBaton.post();
+    raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
+  });
+  threadStartBaton.wait();
+  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
+  callbackBaton.post();
+  thread.join();
+  EXPECT_FALSE(raceA && raceB);
+  EXPECT_TRUE(raceA || raceB);
+  EXPECT_TRUE(raceA ^ raceB);
+}
+
+TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
+  for (size_t i = 0; i < 1 << 8; ++i) {
+    // Basic tests: make sure we
+    // - drain all the messages
+    // - ignore any maxReadAtOnce
+    // - can't add messages during draining
+    EventBase eventBase;
+    IntQueue queue;
+    QueueConsumer consumer;
+    consumer.fn = [&](int i) {
+      EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
+      EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
+      EXPECT_THROW(queue.putMessage(i), std::runtime_error);
+      std::vector<int> ints{1, 2, 3};
+      EXPECT_THROW(
+          queue.putMessages(ints.begin(), ints.end()),
+          std::runtime_error);
+    };
+    consumer.setMaxReadAtOnce(10); // We should ignore this
+    consumer.startConsuming(&eventBase, &queue);
+    for (int i = 0; i < 20; i++) {
+      queue.putMessage(i);
+    }
+    EXPECT_TRUE(consumer.consumeUntilDrained());
+    EXPECT_EQ(20, consumer.messages.size());
+
+    // Make sure there can only be one drainer at once
+    folly::Baton<> callbackBaton, threadStartBaton;
+    consumer.fn = [&](int i) {
+      callbackBaton.wait();
+    };
+    QueueConsumer competingConsumer;
+    competingConsumer.startConsuming(&eventBase, &queue);
+    queue.putMessage(1);
+    atomic<bool> raceA {false};
+    atomic<bool> raceB {false};
+    size_t numConsA = 0;
+    size_t numConsB = 0;
+    auto thread = std::thread([&]{
+      threadStartBaton.post();
+      raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
+    });
+    threadStartBaton.wait();
+    raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
+    callbackBaton.post();
+    thread.join();
+    EXPECT_FALSE(raceA && raceB);
+    EXPECT_TRUE(raceA || raceB);
+    EXPECT_TRUE(raceA ^ raceB);
+  }
+}
+
+TEST(NotificationQueueTest, SendOne) {
+  QueueTest qt;
+  qt.sendOne();
+}
+
+TEST(NotificationQueueTest, PutMessages) {
+  QueueTest qt;
+  qt.sendOne();
+}
+
+TEST(NotificationQueueTest, MultiConsumer) {
+  QueueTest qt;
+  qt.multiConsumer();
+}
+
+TEST(NotificationQueueTest, MaxQueueSize) {
+  QueueTest qt(5);
+  qt.maxQueueSize();
+}
+
+TEST(NotificationQueueTest, MaxReadAtOnce) {
+  QueueTest qt;
+  qt.maxReadAtOnce();
+}
+
+TEST(NotificationQueueTest, DestroyCallback) {
+  QueueTest qt;
+  qt.destroyCallback();
+}
+
+TEST(NotificationQueueTest, SendOnePipe) {
+  QueueTest qt(0, IntQueue::FdType::PIPE);
+  qt.sendOne();
+}
+
+TEST(NotificationQueueTest, PutMessagesPipe) {
+  QueueTest qt(0, IntQueue::FdType::PIPE);
+  qt.sendOne();
+}
+
+TEST(NotificationQueueTest, MultiConsumerPipe) {
+  QueueTest qt(0, IntQueue::FdType::PIPE);
+  qt.multiConsumer();
+}
+
+TEST(NotificationQueueTest, MaxQueueSizePipe) {
+  QueueTest qt(5, IntQueue::FdType::PIPE);
+  qt.maxQueueSize();
+}
+
+TEST(NotificationQueueTest, MaxReadAtOncePipe) {
+  QueueTest qt(0, IntQueue::FdType::PIPE);
+  qt.maxReadAtOnce();
+}
+
+TEST(NotificationQueueTest, DestroyCallbackPipe) {
+  QueueTest qt(0, IntQueue::FdType::PIPE);
+  qt.destroyCallback();
+}
+
+/*
+ * Test code that creates a TNotificationQueue, then forks, and incorrectly
+ * tries to send a message to the queue from the child process.
+ *
+ * The child process should crash in this scenario, since the child code has a
+ * bug.  (Older versions of TNotificationQueue didn't catch this in the child,
+ * resulting in a crash in the parent process.)
+ */
+TEST(NotificationQueueTest, UseAfterFork) {
+  IntQueue queue;
+  int childStatus = 0;
+  QueueConsumer consumer;
+
+  // Boost sets a custom SIGCHLD handler, which fails the test if a child
+  // process exits abnormally.  We don't want this.
+  signal(SIGCHLD, SIG_DFL);
+
+  // Log some info so users reading the test output aren't confused
+  // by the child process' crash log messages.
+  LOG(INFO) << "This test makes sure the child process crashes.  "
+    << "Error log messagges and a backtrace are expected.";
+
+  {
+    // Start a separate thread consuming from the queue
+    ScopedEventBaseThread t1;
+    t1.getEventBase()->runInEventBaseThread([&] {
+      consumer.startConsuming(t1.getEventBase(), &queue);
+    });
+
+    // Send a message to it, just for sanity checking
+    queue.putMessage(1234);
+
+    // Fork
+    pid_t pid = fork();
+    if (pid == 0) {
+      // The boost test framework installs signal handlers to catch errors.
+      // We only want to catch in the parent.  In the child let SIGABRT crash
+      // us normally.
+      signal(SIGABRT, SIG_DFL);
+
+      // Child.
+      // We're horrible people, so we try to send a message to the queue
+      // that is being consumed in the parent process.
+      //
+      // The putMessage() call should catch this error, and crash our process.
+      queue.putMessage(9876);
+      // We shouldn't reach here.
+      _exit(0);
+    }
+
+    // Parent.  Wait for the child to exit.
+    auto waited = waitpid(pid, &childStatus, 0);
+    EXPECT_EQ(pid, waited);
+
+    // Send another message to the queue before we terminate the thread.
+    queue.putMessage(5678);
+  }
+
+  // The child process should have crashed when it tried to call putMessage()
+  // on our TNotificationQueue.
+  EXPECT_TRUE(WIFSIGNALED(childStatus));
+  EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
+
+  // Make sure the parent saw the expected messages.
+  // It should have gotten 1234 and 5678 from the parent process, but not
+  // 9876 from the child.
+  EXPECT_EQ(2, consumer.messages.size());
+  EXPECT_EQ(1234, consumer.messages.front());
+  consumer.messages.pop_front();
+  EXPECT_EQ(5678, consumer.messages.front());
+  consumer.messages.pop_front();
+}
diff --git a/folly/io/async/test/ScopedEventBaseThreadTest.cpp b/folly/io/async/test/ScopedEventBaseThreadTest.cpp
new file mode 100644 (file)
index 0000000..ba94518
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/ScopedEventBaseThread.h>
+
+#include <chrono>
+#include <folly/Baton.h>
+
+#include <gtest/gtest.h>
+
+using namespace std;
+using namespace std::chrono;
+using namespace folly;
+
+class ScopedEventBaseThreadTest : public testing::Test {};
+
+TEST_F(ScopedEventBaseThreadTest, example) {
+  ScopedEventBaseThread sebt;
+
+  Baton<> done;
+  sebt.getEventBase()->runInEventBaseThread([&] { done.post(); });
+  done.timed_wait(steady_clock::now() + milliseconds(100));
+}
+
+TEST_F(ScopedEventBaseThreadTest, start_stop) {
+  ScopedEventBaseThread sebt(false);
+
+  for (size_t i = 0; i < 4; ++i) {
+    EXPECT_EQ(nullptr, sebt.getEventBase());
+    sebt.start();
+    EXPECT_NE(nullptr, sebt.getEventBase());
+
+    Baton<> done;
+    sebt.getEventBase()->runInEventBaseThread([&] { done.post(); });
+    done.timed_wait(steady_clock::now() + milliseconds(100));
+
+    EXPECT_NE(nullptr, sebt.getEventBase());
+    sebt.stop();
+    EXPECT_EQ(nullptr, sebt.getEventBase());
+  }
+}
+
+TEST_F(ScopedEventBaseThreadTest, move) {
+  auto sebt0 = ScopedEventBaseThread();
+  auto sebt1 = std::move(sebt0);
+  auto sebt2 = std::move(sebt1);
+
+  EXPECT_EQ(nullptr, sebt0.getEventBase());
+  EXPECT_EQ(nullptr, sebt1.getEventBase());
+  EXPECT_NE(nullptr, sebt2.getEventBase());
+
+  Baton<> done;
+  sebt2.getEventBase()->runInEventBaseThread([&] { done.post(); });
+  done.timed_wait(steady_clock::now() + milliseconds(100));
+}
+
+TEST_F(ScopedEventBaseThreadTest, self_move) {
+  ScopedEventBaseThread sebt;
+  sebt = std::move(sebt);
+
+  EXPECT_NE(nullptr, sebt.getEventBase());
+
+  Baton<> done;
+  sebt.getEventBase()->runInEventBaseThread([&] { done.post(); });
+  done.timed_wait(steady_clock::now() + milliseconds(100));
+}