Add unit test for timeout=0
[folly.git] / folly / io / async / NotificationQueue.h
index 86a2b95b85861bae9ad7f2de9a6b5c6dbbbdd3be..e1b1cb36fe29b3ea4dd4149d28fd1d517b13d0bf 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,9 +16,7 @@
 
 #pragma once
 
-#include <poll.h>
 #include <sys/types.h>
-#include <unistd.h>
 
 #include <algorithm>
 #include <deque>
 #include <stdexcept>
 #include <utility>
 
+#include <folly/Exception.h>
 #include <folly/FileUtil.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/EventHandler.h>
-#include <folly/io/async/DelayedDestruction.h>
-#include <folly/io/async/Request.h>
 #include <folly/Likely.h>
 #include <folly/ScopeGuard.h>
 #include <folly/SpinLock.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/Request.h>
 #include <folly/portability/Fcntl.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
 
 #include <glog/logging.h>
 
@@ -64,7 +65,7 @@ namespace folly {
  * spinning trying to move a message off the queue and failing, and then
  * retrying.
  */
-template<typename MessageT>
+template <typename MessageT>
 class NotificationQueue {
  public:
   /**
@@ -88,7 +89,7 @@ class NotificationQueue {
      * messageAvailable() will be invoked whenever a new
      * message is available from the pipe.
      */
-    virtual void messageAvailable(MessageT&& message) = 0;
+    virtual void messageAvailable(MessageT&& message) noexcept = 0;
 
     /**
      * Begin consuming messages from the specified queue.
@@ -171,7 +172,7 @@ class NotificationQueue {
 
     void destroy() override;
 
-    virtual ~Consumer() {}
+    ~Consumer() override {}
 
    private:
     /**
@@ -351,11 +352,9 @@ class NotificationQueue {
    * may throw any other exception thrown by the MessageT move/copy
    * constructor.
    */
-  void tryPutMessage(MessageT&& message) {
-    putMessageImpl(std::move(message), advisoryMaxQueueSize_);
-  }
-  void tryPutMessage(const MessageT& message) {
-    putMessageImpl(message, advisoryMaxQueueSize_);
+  template <typename MessageTT>
+  void tryPutMessage(MessageTT&& message) {
+    putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
   }
 
   /**
@@ -366,11 +365,10 @@ class NotificationQueue {
    * (which indicates that the queue is being drained) are prevented from being
    * thrown. User code must still catch std::bad_alloc errors.
    */
-  bool tryPutMessageNoThrow(MessageT&& message) {
-    return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
-  }
-  bool tryPutMessageNoThrow(const MessageT& message) {
-    return putMessageImpl(message, advisoryMaxQueueSize_, false);
+  template <typename MessageTT>
+  bool tryPutMessageNoThrow(MessageTT&& message) {
+    return putMessageImpl(
+        std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
   }
 
   /**
@@ -385,17 +383,15 @@ class NotificationQueue {
    *   - std::runtime_error if the queue is currently draining
    *   - any other exception thrown by the MessageT move/copy constructor.
    */
-  void putMessage(MessageT&& message) {
-    putMessageImpl(std::move(message), 0);
-  }
-  void putMessage(const MessageT& message) {
-    putMessageImpl(message, 0);
+  template <typename MessageTT>
+  void putMessage(MessageTT&& message) {
+    putMessageImpl(std::forward<MessageTT>(message), 0);
   }
 
   /**
    * Put several messages on the queue.
    */
-  template<typename InputIteratorT>
+  template <typename InputIteratorT>
   void putMessages(InputIteratorT first, InputIteratorT last) {
     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
       IterCategory;
@@ -422,9 +418,9 @@ class NotificationQueue {
       return false;
     }
 
-    auto data = std::move(queue_.front());
-    result = data.first;
-    RequestContext::setContext(data.second);
+    auto& data = queue_.front();
+    result = std::move(data.first);
+    RequestContext::setContext(std::move(data.second));
 
     queue_.pop_front();
 
@@ -457,7 +453,7 @@ class NotificationQueue {
   NotificationQueue& operator=(NotificationQueue const &) = delete;
 
   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
-    DCHECK(0 == spinlock_.trylock());
+    DCHECK(0 == spinlock_.try_lock());
     if (maxSize > 0 && queue_.size() >= maxSize) {
       if (throws) {
         throw std::overflow_error("unable to add message to NotificationQueue: "
@@ -488,17 +484,17 @@ class NotificationQueue {
     }
 
     ssize_t bytes_written = 0;
-    ssize_t bytes_expected = 0;
+    size_t bytes_expected = 0;
 
     do {
       if (eventfd_ >= 0) {
         // eventfd(2) dictates that we must write a 64-bit integer
         uint64_t signal = 1;
-        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_expected = sizeof(signal);
         bytes_written = ::write(eventfd_, &signal, bytes_expected);
       } else {
         uint8_t signal = 1;
-        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_expected = sizeof(signal);
         bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
       }
     } while (bytes_written == -1 && errno == EINTR);
@@ -510,7 +506,7 @@ class NotificationQueue {
     }
 #endif
 
-    if (bytes_written == bytes_expected) {
+    if (bytes_written == ssize_t(bytes_expected)) {
       signal_ = true;
     } else {
 #ifdef __ANDROID__
@@ -569,7 +565,8 @@ class NotificationQueue {
     }
   }
 
-  bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
+  template <typename MessageTT>
+  bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
     checkPid();
     bool signal = false;
     {
@@ -582,7 +579,8 @@ class NotificationQueue {
       if (numActiveConsumers_ < numConsumers_) {
         signal = true;
       }
-      queue_.emplace_back(std::move(message), RequestContext::saveContext());
+      queue_.emplace_back(
+          std::forward<MessageTT>(message), RequestContext::saveContext());
       if (signal) {
         ensureSignalLocked();
       }
@@ -590,27 +588,7 @@ class NotificationQueue {
     return true;
   }
 
-  bool putMessageImpl(
-    const MessageT& message, size_t maxSize, bool throws=true) {
-    checkPid();
-    bool signal = false;
-    {
-      folly::SpinLockGuard g(spinlock_);
-      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
-        return false;
-      }
-      if (numActiveConsumers_ < numConsumers_) {
-        signal = true;
-      }
-      queue_.emplace_back(message, RequestContext::saveContext());
-      if (signal) {
-        ensureSignalLocked();
-      }
-    }
-    return true;
-  }
-
-  template<typename InputIteratorT>
+  template <typename InputIteratorT>
   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
                        std::input_iterator_tag) {
     checkPid();
@@ -645,7 +623,7 @@ class NotificationQueue {
   bool draining_{false};
 };
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::destroy() {
   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
   // will be non-nullptr.  Mark the value that it points to, so that
@@ -658,13 +636,13 @@ void NotificationQueue<MessageT>::Consumer::destroy() {
   DelayedDestruction::destroy();
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
     noexcept {
   consumeMessages(false);
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::consumeMessages(
     bool isDrain, size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
@@ -753,7 +731,7 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       if (wasEmpty) {
         return;
       }
-    } catch (const std::exception& ex) {
+    } catch (const std::exception&) {
       // This catch block is really just to handle the case where the MessageT
       // constructor throws.  The messageAvailable() callback itself is
       // declared as noexcept and should never throw.
@@ -774,11 +752,11 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
   }
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::init(
     EventBase* eventBase,
     NotificationQueue* queue) {
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
   assert(queue_ == nullptr);
   assert(!isHandlerRegistered());
   queue->checkPid();
@@ -800,7 +778,7 @@ void NotificationQueue<MessageT>::Consumer::init(
   }
 }
 
-template<typename MessageT>
+template <typename MessageT>
 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   if (queue_ == nullptr) {
     assert(!isHandlerRegistered());
@@ -819,7 +797,7 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   queue_ = nullptr;
 }
 
-template<typename MessageT>
+template <typename MessageT>
 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
     size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
@@ -855,7 +833,7 @@ struct notification_queue_consumer_wrapper
       : callback_(std::forward<UCallback>(callback)) {}
 
   // we are being stricter here and requiring noexcept for callback
-  void messageAvailable(MessageT&& message) override {
+  void messageAvailable(MessageT&& message) noexcept override {
     static_assert(
       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
@@ -883,4 +861,4 @@ NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
           std::forward<TCallback>(callback)));
 }
 
-} // folly
+} // namespace folly