Lift thrift/lib/cpp/test/TNotificationQueueTest.
[folly.git] / folly / io / async / NotificationQueue.h
index 6bdf16fae361fdca48467fb2e0306ffdeaa35e28..46c6a8911492e84e535264742bc4e1bf4c0b3f65 100644 (file)
@@ -120,7 +120,7 @@ class NotificationQueue {
      * @returns true if the queue was drained, false otherwise. In practice,
      * this will only fail if someone else is already draining the queue.
      */
-    bool consumeUntilDrained() noexcept;
+    bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
 
     /**
      * Get the NotificationQueue that this consumer is currently consuming
@@ -165,7 +165,7 @@ class NotificationQueue {
      *
      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
      */
-    void consumeMessages(bool isDrain) noexcept;
+    void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
 
     void setActive(bool active, bool shouldLock = false) {
       if (!queue_) {
@@ -595,11 +595,16 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
 template<typename MessageT>
 void NotificationQueue<MessageT>::Consumer::consumeMessages(
-    bool isDrain) noexcept {
+    bool isDrain, size_t* numConsumed) noexcept {
   uint32_t numProcessed = 0;
   bool firstRun = true;
   setActive(true);
   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
+  SCOPE_EXIT {
+    if (numConsumed != nullptr) {
+      *numConsumed = numProcessed;
+    }
+  };
   while (true) {
     // Try to decrement the eventfd.
     //
@@ -760,7 +765,8 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
 }
 
 template<typename MessageT>
-bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
+    size_t* numConsumed) noexcept {
   {
     folly::SpinLockGuard g(queue_->spinlock_);
     if (queue_->draining_) {
@@ -768,7 +774,7 @@ bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
     }
     queue_->draining_ = true;
   }
-  consumeMessages(true);
+  consumeMessages(true, numConsumed);
   {
     folly::SpinLockGuard g(queue_->spinlock_);
     queue_->draining_ = false;