Copyright 2014->2015
[folly.git] / folly / io / async / NotificationQueue.h
index d0afd3e414e3d166f86f8ed42cc0e7ea4dc4fdc5..95c85160d6403e0353f9d81b3acb9ee15ea6fa95 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <unistd.h>
 
 #include <folly/io/async/EventBase.h>
-#include <folly/io/async/EventFDWrapper.h>
 #include <folly/io/async/EventHandler.h>
 #include <folly/io/async/Request.h>
 #include <folly/Likely.h>
-#include <folly/SmallLocks.h>
 #include <folly/ScopeGuard.h>
+#include <folly/SpinLock.h>
 
 #include <glog/logging.h>
 #include <deque>
 
+#if __linux__ && !__ANDROID__
+#define FOLLY_HAVE_EVENTFD
+#include <folly/io/async/EventFDWrapper.h>
+#endif
+
 namespace folly {
 
 /**
@@ -107,6 +111,17 @@ class NotificationQueue {
      */
     void stopConsuming();
 
+    /**
+     * Consume messages off the queue until it is empty. No messages may be
+     * added to the queue while it is draining, so that the process is bounded.
+     * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
+     * and tryPutMessageNoThrow will return false.
+     *
+     * @returns true if the queue was drained, false otherwise. In practice,
+     * this will only fail if someone else is already draining the queue.
+     */
+    bool consumeUntilDrained() noexcept;
+
     /**
      * Get the NotificationQueue that this consumer is currently consuming
      * messages from.  Returns nullptr if the consumer is not currently
@@ -140,6 +155,17 @@ class NotificationQueue {
     virtual void handlerReady(uint16_t events) noexcept;
 
    private:
+    /**
+     * Consume messages off the the queue until
+     *   - the queue is empty (1), or
+     *   - until the consumer is destroyed, or
+     *   - until the consumer is uninstalled, or
+     *   - an exception is thrown in the course of dequeueing, or
+     *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
+     *
+     * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
+     */
+    void consumeMessages(bool isDrain) noexcept;
 
     void setActive(bool active, bool shouldLock = false) {
       if (!queue_) {
@@ -169,8 +195,10 @@ class NotificationQueue {
   };
 
   enum class FdType {
+    PIPE,
+#ifdef FOLLY_HAVE_EVENTFD
     EVENTFD,
-    PIPE
+#endif
   };
 
   /**
@@ -189,17 +217,20 @@ class NotificationQueue {
    * mostly for testing purposes.
    */
   explicit NotificationQueue(uint32_t maxSize = 0,
-                              FdType fdType = FdType::EVENTFD)
+#ifdef FOLLY_HAVE_EVENTFD
+                             FdType fdType = FdType::EVENTFD)
+#else
+                             FdType fdType = FdType::PIPE)
+#endif
     : eventfd_(-1),
       pipeFds_{-1, -1},
       advisoryMaxQueueSize_(maxSize),
       pid_(getpid()),
       queue_() {
 
-    spinlock_.init();
-
     RequestContext::getStaticContext();
 
+#ifdef FOLLY_HAVE_EVENTFD
     if (fdType == FdType::EVENTFD) {
       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
       if (eventfd_ == -1) {
@@ -216,6 +247,7 @@ class NotificationQueue {
         }
       }
     }
+#endif
     if (fdType == FdType::PIPE) {
       if (pipe(pipeFds_)) {
         folly::throwSystemError("Failed to create pipe for NotificationQueue",
@@ -272,6 +304,8 @@ class NotificationQueue {
    * If the queue is full, a std::overflow_error will be thrown.  The
    * setMaxQueueSize() function controls the maximum queue size.
    *
+   * If the queue is currently draining, an std::runtime_error will be thrown.
+   *
    * This method may contend briefly on a spinlock if many threads are
    * concurrently accessing the queue, but for all intents and purposes it will
    * immediately place the message on the queue and return.
@@ -291,8 +325,9 @@ class NotificationQueue {
    * No-throw versions of the above.  Instead returns true on success, false on
    * failure.
    *
-   * Only std::overflow_error is prevented from being thrown (since this is the
-   * common exception case), user code must still catch std::bad_alloc errors.
+   * Only std::overflow_error (the common exception case) and std::runtime_error
+   * (which indicates that the queue is being drained) are prevented from being
+   * thrown. User code must still catch std::bad_alloc errors.
    */
   bool tryPutMessageNoThrow(MessageT&& message) {
     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
@@ -308,8 +343,10 @@ class NotificationQueue {
    * and always puts the message on the queue, even if the maximum queue size
    * would be exceeded.
    *
-   * putMessage() may throw std::bad_alloc if memory allocation fails, and may
-   * throw any other exception thrown by the MessageT move/copy constructor.
+   * putMessage() may throw
+   *   - std::bad_alloc if memory allocation fails, and may
+   *   - std::runtime_error if the queue is currently draining
+   *   - any other exception thrown by the MessageT move/copy constructor.
    */
   void putMessage(MessageT&& message) {
     putMessageImpl(std::move(message), 0);
@@ -342,7 +379,7 @@ class NotificationQueue {
 
     try {
 
-      folly::MSLGuard g(spinlock_);
+      folly::SpinLockGuard g(spinlock_);
 
       if (UNLIKELY(queue_.empty())) {
         return false;
@@ -366,7 +403,7 @@ class NotificationQueue {
   }
 
   int size() {
-    folly::MSLGuard g(spinlock_);
+    folly::SpinLockGuard g(spinlock_);
     return queue_.size();
   }
 
@@ -393,7 +430,7 @@ class NotificationQueue {
   NotificationQueue& operator=(NotificationQueue const &) = delete;
 
   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
-    DCHECK(0 == spinlock_.try_lock());
+    DCHECK(0 == spinlock_.trylock());
     if (maxSize > 0 && queue_.size() >= maxSize) {
       if (throws) {
         throw std::overflow_error("unable to add message to NotificationQueue: "
@@ -404,6 +441,13 @@ class NotificationQueue {
     return true;
   }
 
+  inline bool checkDraining(bool throws=true) {
+    if (UNLIKELY(draining_ && throws)) {
+      throw std::runtime_error("queue is draining, cannot add message");
+    }
+    return draining_;
+  }
+
   inline void signalEvent(size_t numAdded = 1) const {
     static const uint8_t kPipeMessage[] = {
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
@@ -461,8 +505,8 @@ class NotificationQueue {
     checkPid();
     bool signal = false;
     {
-      folly::MSLGuard g(spinlock_);
-      if (!checkQueueSize(maxSize, throws)) {
+      folly::SpinLockGuard g(spinlock_);
+      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
         return false;
       }
       // We only need to signal an event if not all consumers are
@@ -485,8 +529,8 @@ class NotificationQueue {
     checkPid();
     bool signal = false;
     {
-      folly::MSLGuard g(spinlock_);
-      if (!checkQueueSize(maxSize, throws)) {
+      folly::SpinLockGuard g(spinlock_);
+      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
         return false;
       }
       if (numActiveConsumers_ < numConsumers_) {
@@ -507,7 +551,8 @@ class NotificationQueue {
     bool signal = false;
     size_t numAdded = 0;
     {
-      folly::MSLGuard g(spinlock_);
+      folly::SpinLockGuard g(spinlock_);
+      checkDraining();
       while (first != last) {
         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
         ++first;
@@ -522,7 +567,7 @@ class NotificationQueue {
     }
   }
 
-  mutable folly::MicroSpinLock spinlock_;
+  mutable folly::SpinLock spinlock_;
   int eventfd_;
   int pipeFds_[2]; // to fallback to on older/non-linux systems
   uint32_t advisoryMaxQueueSize_;
@@ -530,6 +575,7 @@ class NotificationQueue {
   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
   int numConsumers_{0};
   std::atomic<int> numActiveConsumers_{0};
+  bool draining_{false};
 };
 
 template<typename MessageT>
@@ -546,6 +592,12 @@ NotificationQueue<MessageT>::Consumer::~Consumer() {
 template<typename MessageT>
 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
     noexcept {
+  consumeMessages(false);
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::consumeMessages(
+    bool isDrain) noexcept {
   uint32_t numProcessed = 0;
   bool firstRun = true;
   setActive(true);
@@ -557,7 +609,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
     // may not actually be an event available (another consumer may
     // have read it).  We don't really care, we only care about
     // emptying the queue.
-    if (firstRun) {
+    if (!isDrain && firstRun) {
       queue_->tryConsumeEvent();
       firstRun = false;
     }
@@ -576,6 +628,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
     try {
       if (UNLIKELY(queue_->queue_.empty())) {
         // If there is no message, we've reached the end of the queue, return.
+        setActive(false);
         queue_->spinlock_.unlock();
         return;
       }
@@ -621,7 +674,8 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
       // If we have hit maxReadAtOnce_, we are done.
       ++numProcessed;
-      if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+      if (!isDrain && maxReadAtOnce_ > 0 &&
+          numProcessed >= maxReadAtOnce_) {
         queue_->signalEvent(1);
         return;
       }
@@ -652,7 +706,9 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
         // Push a notification back on the eventfd since we didn't actually
         // read the message off of the queue.
-        queue_->signalEvent(1);
+        if (!isDrain) {
+          queue_->signalEvent(1);
+        }
       }
 
       return;
@@ -674,7 +730,7 @@ void NotificationQueue<MessageT>::Consumer::init(
   queue_ = queue;
 
   {
-    folly::MSLGuard g(queue_->spinlock_);
+    folly::SpinLockGuard g(queue_->spinlock_);
     queue_->numConsumers_++;
   }
   queue_->signalEvent();
@@ -694,7 +750,7 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   }
 
   {
-    folly::MSLGuard g(queue_->spinlock_);
+    folly::SpinLockGuard g(queue_->spinlock_);
     queue_->numConsumers_--;
     setActive(false);
   }
@@ -705,4 +761,21 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   queue_ = nullptr;
 }
 
+template<typename MessageT>
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+  {
+    folly::SpinLockGuard g(queue_->spinlock_);
+    if (queue_->draining_) {
+      return false;
+    }
+    queue_->draining_ = true;
+  }
+  consumeMessages(true);
+  {
+    folly::SpinLockGuard g(queue_->spinlock_);
+    queue_->draining_ = false;
+  }
+  return true;
+}
+
 } // folly