Switch uses of <unistd.h> to <folly/portability/Unistd.h>
[folly.git] / folly / io / async / NotificationQueue.h
index 7c8b2ee9684c155252ecc8b1f28f98cb7e89ae56..0f26d88d51a13546f61d21bbce11af247370344f 100644 (file)
@@ -16,9 +16,8 @@
 
 #pragma once
 
-#include <fcntl.h>
+#include <poll.h>
 #include <sys/types.h>
-#include <unistd.h>
 
 #include <algorithm>
 #include <deque>
@@ -35,6 +34,8 @@
 #include <folly/Likely.h>
 #include <folly/ScopeGuard.h>
 #include <folly/SpinLock.h>
+#include <folly/portability/Fcntl.h>
+#include <folly/portability/Unistd.h>
 
 #include <glog/logging.h>
 
@@ -212,6 +213,24 @@ class NotificationQueue {
     bool active_{false};
   };
 
+  class SimpleConsumer {
+   public:
+    explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
+      ++queue_.numConsumers_;
+    }
+
+    ~SimpleConsumer() {
+      --queue_.numConsumers_;
+    }
+
+    int getFd() const {
+      return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
+    }
+
+   private:
+    NotificationQueue& queue_;
+  };
+
   enum class FdType {
     PIPE,
 #ifdef FOLLY_HAVE_EVENTFD
@@ -250,7 +269,7 @@ class NotificationQueue {
 
 #ifdef FOLLY_HAVE_EVENTFD
     if (fdType == FdType::EVENTFD) {
-      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
+      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
       if (eventfd_ == -1) {
         if (errno == ENOSYS || errno == EINVAL) {
           // eventfd not availalble
@@ -393,29 +412,21 @@ class NotificationQueue {
    * unmodified.
    */
   bool tryConsume(MessageT& result) {
+    SCOPE_EXIT { syncSignalAndQueue(); };
+
     checkPid();
 
-    try {
+    folly::SpinLockGuard g(spinlock_);
 
-      folly::SpinLockGuard g(spinlock_);
+    if (UNLIKELY(queue_.empty())) {
+      return false;
+    }
 
-      if (UNLIKELY(queue_.empty())) {
-        return false;
-      }
+    auto data = std::move(queue_.front());
+    result = data.first;
+    RequestContext::setContext(data.second);
 
-      auto data = std::move(queue_.front());
-      result = data.first;
-      RequestContext::setContext(data.second);
-
-      queue_.pop_front();
-    } catch (...) {
-      // Handle an exception if the assignment operator happens to throw.
-      // We consumed an event but weren't able to pop the message off the
-      // queue.  Signal the event again since the message is still in the
-      // queue.
-      signalEvent(1);
-      throw;
-    }
+    queue_.pop_front();
 
     return true;
   }
@@ -470,39 +481,38 @@ class NotificationQueue {
   mutable std::atomic<int> maxEventBytes_{0};
 #endif
 
-  inline void signalEvent(size_t numAdded = 1) const {
-    static const uint8_t kPipeMessage[] = {
-      1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
-    };
+  void ensureSignalLocked() const {
+    // semantics: empty fd == empty queue <=> !signal_
+    if (signal_) {
+      return;
+    }
 
     ssize_t bytes_written = 0;
     ssize_t bytes_expected = 0;
-    if (eventfd_ >= 0) {
-      // eventfd(2) dictates that we must write a 64-bit integer
-      uint64_t numAdded64(numAdded);
-      bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
-      bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
-    } else {
-      // pipe semantics, add one message for each numAdded
-      bytes_expected = numAdded;
-      do {
-        size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
-        ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
-        if (rc < 0) {
-          // TODO: if the pipe is full, write will fail with EAGAIN.
-          // See task #1044651 for how this could be handled
-          break;
-        }
-        numAdded -= rc;
-        bytes_written += rc;
-      } while (numAdded > 0);
-    }
+
+    do {
+      if (eventfd_ >= 0) {
+        // eventfd(2) dictates that we must write a 64-bit integer
+        uint64_t signal = 1;
+        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_written = ::write(eventfd_, &signal, bytes_expected);
+      } else {
+        uint8_t signal = 1;
+        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
+      }
+    } while (bytes_written == -1 && errno == EINTR);
+
 #ifdef __ANDROID__
-    eventBytes_ += bytes_written;
-    maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
+    if (bytes_written > 0) {
+      eventBytes_ += bytes_written;
+      maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
+    }
 #endif
 
-    if (bytes_written != bytes_expected) {
+    if (bytes_written == bytes_expected) {
+      signal_ = true;
+    } else {
 #ifdef __ANDROID__
       LOG(ERROR) << "NotificationQueue Write Error=" << errno
                  << " bytesInPipe=" << eventBytes_
@@ -513,27 +523,50 @@ class NotificationQueue {
     }
   }
 
-  bool tryConsumeEvent() {
-    uint64_t value = 0;
-    ssize_t rc = -1;
-    if (eventfd_ >= 0) {
-      rc = readNoInt(eventfd_, &value, sizeof(value));
+  void drainSignalsLocked() {
+    ssize_t bytes_read = 0;
+    if (eventfd_ > 0) {
+      uint64_t message;
+      bytes_read = readNoInt(eventfd_, &message, sizeof(message));
+      CHECK(bytes_read != -1 || errno == EAGAIN);
     } else {
-      uint8_t value8;
-      rc = readNoInt(pipeFds_[0], &value8, sizeof(value8));
-      value = value8;
+      // There should only be one byte in the pipe. To avoid potential leaks we still drain.
+      uint8_t message[32];
+      ssize_t result;
+      while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
+        bytes_read += result;
+      }
+      CHECK(result == -1 && errno == EAGAIN);
+      LOG_IF(ERROR, bytes_read > 1)
+        << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
+        << bytes_read << " bytes, expected <= 1";
+    }
+    LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
+      << "[NotificationQueue] Unexpected state while draining signals: signal_="
+      << signal_ << " bytes_read=" << bytes_read;
+
+    signal_ = false;
+
 #ifdef __ANDROID__
-      eventBytes_ -= 1;
-#endif
+    if (bytes_read > 0) {
+      eventBytes_ -= bytes_read;
     }
-    if (rc < 0) {
-      // EAGAIN should pretty much be the only error we can ever get.
-      // This means someone else already processed the only available message.
-      CHECK_EQ(errno, EAGAIN);
-      return false;
+#endif
+  }
+
+  void ensureSignal() const {
+    folly::SpinLockGuard g(spinlock_);
+    ensureSignalLocked();
+  }
+
+  void syncSignalAndQueue() {
+    folly::SpinLockGuard g(spinlock_);
+
+    if (queue_.empty()) {
+      drainSignalsLocked();
+    } else {
+      ensureSignalLocked();
     }
-    assert(value == 1);
-    return true;
   }
 
   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
@@ -550,9 +583,9 @@ class NotificationQueue {
         signal = true;
       }
       queue_.emplace_back(std::move(message), RequestContext::saveContext());
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
     return true;
   }
@@ -570,9 +603,9 @@ class NotificationQueue {
         signal = true;
       }
       queue_.emplace_back(message, RequestContext::saveContext());
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
     return true;
   }
@@ -594,13 +627,14 @@ class NotificationQueue {
       if (numActiveConsumers_ < numConsumers_) {
         signal = true;
       }
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
   }
 
   mutable folly::SpinLock spinlock_;
+  mutable bool signal_{false};
   int eventfd_;
   int pipeFds_[2]; // to fallback to on older/non-linux systems
   uint32_t advisoryMaxQueueSize_;
@@ -635,8 +669,12 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
     bool isDrain, size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
   uint32_t numProcessed = 0;
-  bool firstRun = true;
   setActive(true);
+  SCOPE_EXIT {
+    if (queue_) {
+      queue_->syncSignalAndQueue();
+    }
+  };
   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
   SCOPE_EXIT {
     if (numConsumed != nullptr) {
@@ -644,17 +682,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
     }
   };
   while (true) {
-    // Try to decrement the eventfd.
-    //
-    // The eventfd is only used to wake up the consumer - there may or
-    // may not actually be an event available (another consumer may
-    // have read it).  We don't really care, we only care about
-    // emptying the queue.
-    if (!isDrain && firstRun) {
-      queue_->tryConsumeEvent();
-      firstRun = false;
-    }
-
     // Now pop the message off of the queue.
     //
     // We have to manually acquire and release the spinlock here, rather than
@@ -678,8 +705,7 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       auto& data = queue_->queue_.front();
 
       MessageT msg(std::move(data.first));
-      auto old_ctx =
-        RequestContext::setContext(data.second);
+      RequestContextScopeGuard rctx(std::move(data.second));
       queue_->queue_.pop_front();
 
       // Check to see if the queue is empty now.
@@ -701,8 +727,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       messageAvailable(std::move(msg));
       destroyedFlagPtr_ = nullptr;
 
-      RequestContext::setContext(old_ctx);
-
       // If the callback was destroyed before it returned, we are done
       if (callbackDestroyed) {
         return;
@@ -717,7 +741,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       ++numProcessed;
       if (!isDrain && maxReadAtOnce_ > 0 &&
           numProcessed >= maxReadAtOnce_) {
-        queue_->signalEvent(1);
         return;
       }
 
@@ -744,12 +767,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       if (locked) {
         // Unlock the spinlock.
         queue_->spinlock_.unlock();
-
-        // Push a notification back on the eventfd since we didn't actually
-        // read the message off of the queue.
-        if (!isDrain) {
-          queue_->signalEvent(1);
-        }
       }
 
       return;
@@ -774,7 +791,7 @@ void NotificationQueue<MessageT>::Consumer::init(
     folly::SpinLockGuard g(queue_->spinlock_);
     queue_->numConsumers_++;
   }
-  queue_->signalEvent();
+  queue_->ensureSignal();
 
   if (queue_->eventfd_ >= 0) {
     initHandler(eventBase, queue_->eventfd_);