Add unit test for timeout=0
[folly.git] / folly / io / async / NotificationQueue.h
index 28a2878eb81ecde122f68b5e440d870bdb0859bc..e1b1cb36fe29b3ea4dd4149d28fd1d517b13d0bf 100644 (file)
 #include <stdexcept>
 #include <utility>
 
+#include <folly/Exception.h>
 #include <folly/FileUtil.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/EventHandler.h>
-#include <folly/io/async/DelayedDestruction.h>
-#include <folly/io/async/Request.h>
 #include <folly/Likely.h>
 #include <folly/ScopeGuard.h>
 #include <folly/SpinLock.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/Request.h>
 #include <folly/portability/Fcntl.h>
 #include <folly/portability/Sockets.h>
 #include <folly/portability/Unistd.h>
@@ -64,7 +65,7 @@ namespace folly {
  * spinning trying to move a message off the queue and failing, and then
  * retrying.
  */
-template<typename MessageT>
+template <typename MessageT>
 class NotificationQueue {
  public:
   /**
@@ -171,7 +172,7 @@ class NotificationQueue {
 
     void destroy() override;
 
-    virtual ~Consumer() {}
+    ~Consumer() override {}
 
    private:
     /**
@@ -351,11 +352,9 @@ class NotificationQueue {
    * may throw any other exception thrown by the MessageT move/copy
    * constructor.
    */
-  void tryPutMessage(MessageT&& message) {
-    putMessageImpl(std::move(message), advisoryMaxQueueSize_);
-  }
-  void tryPutMessage(const MessageT& message) {
-    putMessageImpl(message, advisoryMaxQueueSize_);
+  template <typename MessageTT>
+  void tryPutMessage(MessageTT&& message) {
+    putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
   }
 
   /**
@@ -366,11 +365,10 @@ class NotificationQueue {
    * (which indicates that the queue is being drained) are prevented from being
    * thrown. User code must still catch std::bad_alloc errors.
    */
-  bool tryPutMessageNoThrow(MessageT&& message) {
-    return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
-  }
-  bool tryPutMessageNoThrow(const MessageT& message) {
-    return putMessageImpl(message, advisoryMaxQueueSize_, false);
+  template <typename MessageTT>
+  bool tryPutMessageNoThrow(MessageTT&& message) {
+    return putMessageImpl(
+        std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
   }
 
   /**
@@ -385,17 +383,15 @@ class NotificationQueue {
    *   - std::runtime_error if the queue is currently draining
    *   - any other exception thrown by the MessageT move/copy constructor.
    */
-  void putMessage(MessageT&& message) {
-    putMessageImpl(std::move(message), 0);
-  }
-  void putMessage(const MessageT& message) {
-    putMessageImpl(message, 0);
+  template <typename MessageTT>
+  void putMessage(MessageTT&& message) {
+    putMessageImpl(std::forward<MessageTT>(message), 0);
   }
 
   /**
    * Put several messages on the queue.
    */
-  template<typename InputIteratorT>
+  template <typename InputIteratorT>
   void putMessages(InputIteratorT first, InputIteratorT last) {
     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
       IterCategory;
@@ -569,7 +565,8 @@ class NotificationQueue {
     }
   }
 
-  bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
+  template <typename MessageTT>
+  bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
     checkPid();
     bool signal = false;
     {
@@ -582,7 +579,8 @@ class NotificationQueue {
       if (numActiveConsumers_ < numConsumers_) {
         signal = true;
       }
-      queue_.emplace_back(std::move(message), RequestContext::saveContext());
+      queue_.emplace_back(
+          std::forward<MessageTT>(message), RequestContext::saveContext());
       if (signal) {
         ensureSignalLocked();
       }
@@ -590,27 +588,7 @@ class NotificationQueue {
     return true;
   }
 
-  bool putMessageImpl(
-    const MessageT& message, size_t maxSize, bool throws=true) {
-    checkPid();
-    bool signal = false;
-    {
-      folly::SpinLockGuard g(spinlock_);
-      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
-        return false;
-      }
-      if (numActiveConsumers_ < numConsumers_) {
-        signal = true;
-      }
-      queue_.emplace_back(message, RequestContext::saveContext());
-      if (signal) {
-        ensureSignalLocked();
-      }
-    }
-    return true;
-  }
-
-  template<typename InputIteratorT>
+  template <typename InputIteratorT>
   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
                        std::input_iterator_tag) {
     checkPid();
@@ -645,7 +623,7 @@ class NotificationQueue {
   bool draining_{false};
 };
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::destroy() {
   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
   // will be non-nullptr.  Mark the value that it points to, so that
@@ -658,13 +636,13 @@ void NotificationQueue<MessageT>::Consumer::destroy() {
   DelayedDestruction::destroy();
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
     noexcept {
   consumeMessages(false);
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::consumeMessages(
     bool isDrain, size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
@@ -774,11 +752,11 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
   }
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::init(
     EventBase* eventBase,
     NotificationQueue* queue) {
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
   assert(queue_ == nullptr);
   assert(!isHandlerRegistered());
   queue->checkPid();
@@ -800,7 +778,7 @@ void NotificationQueue<MessageT>::Consumer::init(
   }
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   if (queue_ == nullptr) {
     assert(!isHandlerRegistered());
@@ -819,7 +797,7 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   queue_ = nullptr;
 }
 
-template<typename MessageT>
+template <typename MessageT>
 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
     size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
@@ -883,4 +861,4 @@ NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
           std::forward<TCallback>(callback)));
 }
 
-} // folly
+} // namespace folly