making thrift and folly header files compile clean with -Wunused-parameter
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/Request.h>
25 #include <folly/Likely.h>
26 #include <folly/ScopeGuard.h>
27 #include <folly/SpinLock.h>
28
29 #include <glog/logging.h>
30 #include <deque>
31
32 #if __linux__ && !__ANDROID__
33 #define FOLLY_HAVE_EVENTFD
34 #include <folly/io/async/EventFDWrapper.h>
35 #endif
36
37 namespace folly {
38
39 /**
40  * A producer-consumer queue for passing messages between EventBase threads.
41  *
42  * Messages can be added to the queue from any thread.  Multiple consumers may
43  * listen to the queue from multiple EventBase threads.
44  *
45  * A NotificationQueue may not be destroyed while there are still consumers
46  * registered to receive events from the queue.  It is the user's
47  * responsibility to ensure that all consumers are unregistered before the
48  * queue is destroyed.
49  *
50  * MessageT should be MoveConstructible (i.e., must support either a move
51  * constructor or a copy constructor, or both).  Ideally it's move constructor
52  * (or copy constructor if no move constructor is provided) should never throw
53  * exceptions.  If the constructor may throw, the consumers could end up
54  * spinning trying to move a message off the queue and failing, and then
55  * retrying.
56  */
57 template<typename MessageT>
58 class NotificationQueue {
59  public:
60   /**
61    * A callback interface for consuming messages from the queue as they arrive.
62    */
63   class Consumer : private EventHandler {
64    public:
65     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
66
67     Consumer()
68       : queue_(nullptr),
69         destroyedFlagPtr_(nullptr),
70         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
71
72     virtual ~Consumer();
73
74     /**
75      * messageAvailable() will be invoked whenever a new
76      * message is available from the pipe.
77      */
78     virtual void messageAvailable(MessageT&& message) = 0;
79
80     /**
81      * Begin consuming messages from the specified queue.
82      *
83      * messageAvailable() will be called whenever a message is available.  This
84      * consumer will continue to consume messages until stopConsuming() is
85      * called.
86      *
87      * A Consumer may only consume messages from a single NotificationQueue at
88      * a time.  startConsuming() should not be called if this consumer is
89      * already consuming.
90      */
91     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
92       init(eventBase, queue);
93       registerHandler(READ | PERSIST);
94     }
95
96     /**
97      * Same as above but registers this event handler as internal so that it
98      * doesn't count towards the pending reader count for the IOLoop.
99      */
100     void startConsumingInternal(
101         EventBase* eventBase, NotificationQueue* queue) {
102       init(eventBase, queue);
103       registerInternalHandler(READ | PERSIST);
104     }
105
106     /**
107      * Stop consuming messages.
108      *
109      * startConsuming() may be called again to resume consumption of messages
110      * at a later point in time.
111      */
112     void stopConsuming();
113
114     /**
115      * Consume messages off the queue until it is empty. No messages may be
116      * added to the queue while it is draining, so that the process is bounded.
117      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
118      * and tryPutMessageNoThrow will return false.
119      *
120      * @returns true if the queue was drained, false otherwise. In practice,
121      * this will only fail if someone else is already draining the queue.
122      */
123     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
124
125     /**
126      * Get the NotificationQueue that this consumer is currently consuming
127      * messages from.  Returns nullptr if the consumer is not currently
128      * consuming events from any queue.
129      */
130     NotificationQueue* getCurrentQueue() const {
131       return queue_;
132     }
133
134     /**
135      * Set a limit on how many messages this consumer will read each iteration
136      * around the event loop.
137      *
138      * This helps rate-limit how much work the Consumer will do each event loop
139      * iteration, to prevent it from starving other event handlers.
140      *
141      * A limit of 0 means no limit will be enforced.  If unset, the limit
142      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
143      */
144     void setMaxReadAtOnce(uint32_t maxAtOnce) {
145       maxReadAtOnce_ = maxAtOnce;
146     }
147     uint32_t getMaxReadAtOnce() const {
148       return maxReadAtOnce_;
149     }
150
151     EventBase* getEventBase() {
152       return base_;
153     }
154
155     virtual void handlerReady(uint16_t events) noexcept;
156
157    private:
158     /**
159      * Consume messages off the the queue until
160      *   - the queue is empty (1), or
161      *   - until the consumer is destroyed, or
162      *   - until the consumer is uninstalled, or
163      *   - an exception is thrown in the course of dequeueing, or
164      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
165      *
166      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
167      */
168     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
169
170     void setActive(bool active, bool shouldLock = false) {
171       if (!queue_) {
172         active_ = active;
173         return;
174       }
175       if (shouldLock) {
176         queue_->spinlock_.lock();
177       }
178       if (!active_ && active) {
179         ++queue_->numActiveConsumers_;
180       } else if (active_ && !active) {
181         --queue_->numActiveConsumers_;
182       }
183       active_ = active;
184       if (shouldLock) {
185         queue_->spinlock_.unlock();
186       }
187     }
188     void init(EventBase* eventBase, NotificationQueue* queue);
189
190     NotificationQueue* queue_;
191     bool* destroyedFlagPtr_;
192     uint32_t maxReadAtOnce_;
193     EventBase* base_;
194     bool active_{false};
195   };
196
197   enum class FdType {
198     PIPE,
199 #ifdef FOLLY_HAVE_EVENTFD
200     EVENTFD,
201 #endif
202   };
203
204   /**
205    * Create a new NotificationQueue.
206    *
207    * If the maxSize parameter is specified, this sets the maximum queue size
208    * that will be enforced by tryPutMessage().  (This size is advisory, and may
209    * be exceeded if producers explicitly use putMessage() instead of
210    * tryPutMessage().)
211    *
212    * The fdType parameter determines the type of file descriptor used
213    * internally to signal message availability.  The default (eventfd) is
214    * preferable for performance and because it won't fail when the queue gets
215    * too long.  It is not available on on older and non-linux kernels, however.
216    * In this case the code will fall back to using a pipe, the parameter is
217    * mostly for testing purposes.
218    */
219   explicit NotificationQueue(uint32_t maxSize = 0,
220 #ifdef FOLLY_HAVE_EVENTFD
221                              FdType fdType = FdType::EVENTFD)
222 #else
223                              FdType fdType = FdType::PIPE)
224 #endif
225     : eventfd_(-1),
226       pipeFds_{-1, -1},
227       advisoryMaxQueueSize_(maxSize),
228       pid_(getpid()),
229       queue_() {
230
231     RequestContext::saveContext();
232
233 #ifdef FOLLY_HAVE_EVENTFD
234     if (fdType == FdType::EVENTFD) {
235       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
236       if (eventfd_ == -1) {
237         if (errno == ENOSYS || errno == EINVAL) {
238           // eventfd not availalble
239           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
240                      << errno << ", falling back to pipe mode (is your kernel "
241                      << "> 2.6.30?)";
242           fdType = FdType::PIPE;
243         } else {
244           // some other error
245           folly::throwSystemError("Failed to create eventfd for "
246                                   "NotificationQueue", errno);
247         }
248       }
249     }
250 #endif
251     if (fdType == FdType::PIPE) {
252       if (pipe(pipeFds_)) {
253         folly::throwSystemError("Failed to create pipe for NotificationQueue",
254                                 errno);
255       }
256       try {
257         // put both ends of the pipe into non-blocking mode
258         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
259           folly::throwSystemError("failed to put NotificationQueue pipe read "
260                                   "endpoint into non-blocking mode", errno);
261         }
262         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
263           folly::throwSystemError("failed to put NotificationQueue pipe write "
264                                   "endpoint into non-blocking mode", errno);
265         }
266       } catch (...) {
267         ::close(pipeFds_[0]);
268         ::close(pipeFds_[1]);
269         throw;
270       }
271     }
272   }
273
274   ~NotificationQueue() {
275     if (eventfd_ >= 0) {
276       ::close(eventfd_);
277       eventfd_ = -1;
278     }
279     if (pipeFds_[0] >= 0) {
280       ::close(pipeFds_[0]);
281       pipeFds_[0] = -1;
282     }
283     if (pipeFds_[1] >= 0) {
284       ::close(pipeFds_[1]);
285       pipeFds_[1] = -1;
286     }
287   }
288
289   /**
290    * Set the advisory maximum queue size.
291    *
292    * This maximum queue size affects calls to tryPutMessage().  Message
293    * producers can still use the putMessage() call to unconditionally put a
294    * message on the queue, ignoring the configured maximum queue size.  This
295    * can cause the queue size to exceed the configured maximum.
296    */
297   void setMaxQueueSize(uint32_t max) {
298     advisoryMaxQueueSize_ = max;
299   }
300
301   /**
302    * Attempt to put a message on the queue if the queue is not already full.
303    *
304    * If the queue is full, a std::overflow_error will be thrown.  The
305    * setMaxQueueSize() function controls the maximum queue size.
306    *
307    * If the queue is currently draining, an std::runtime_error will be thrown.
308    *
309    * This method may contend briefly on a spinlock if many threads are
310    * concurrently accessing the queue, but for all intents and purposes it will
311    * immediately place the message on the queue and return.
312    *
313    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
314    * may throw any other exception thrown by the MessageT move/copy
315    * constructor.
316    */
317   void tryPutMessage(MessageT&& message) {
318     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
319   }
320   void tryPutMessage(const MessageT& message) {
321     putMessageImpl(message, advisoryMaxQueueSize_);
322   }
323
324   /**
325    * No-throw versions of the above.  Instead returns true on success, false on
326    * failure.
327    *
328    * Only std::overflow_error (the common exception case) and std::runtime_error
329    * (which indicates that the queue is being drained) are prevented from being
330    * thrown. User code must still catch std::bad_alloc errors.
331    */
332   bool tryPutMessageNoThrow(MessageT&& message) {
333     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
334   }
335   bool tryPutMessageNoThrow(const MessageT& message) {
336     return putMessageImpl(message, advisoryMaxQueueSize_, false);
337   }
338
339   /**
340    * Unconditionally put a message on the queue.
341    *
342    * This method is like tryPutMessage(), but ignores the maximum queue size
343    * and always puts the message on the queue, even if the maximum queue size
344    * would be exceeded.
345    *
346    * putMessage() may throw
347    *   - std::bad_alloc if memory allocation fails, and may
348    *   - std::runtime_error if the queue is currently draining
349    *   - any other exception thrown by the MessageT move/copy constructor.
350    */
351   void putMessage(MessageT&& message) {
352     putMessageImpl(std::move(message), 0);
353   }
354   void putMessage(const MessageT& message) {
355     putMessageImpl(message, 0);
356   }
357
358   /**
359    * Put several messages on the queue.
360    */
361   template<typename InputIteratorT>
362   void putMessages(InputIteratorT first, InputIteratorT last) {
363     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
364       IterCategory;
365     putMessagesImpl(first, last, IterCategory());
366   }
367
368   /**
369    * Try to immediately pull a message off of the queue, without blocking.
370    *
371    * If a message is immediately available, the result parameter will be
372    * updated to contain the message contents and true will be returned.
373    *
374    * If no message is available, false will be returned and result will be left
375    * unmodified.
376    */
377   bool tryConsume(MessageT& result) {
378     checkPid();
379
380     try {
381
382       folly::SpinLockGuard g(spinlock_);
383
384       if (UNLIKELY(queue_.empty())) {
385         return false;
386       }
387
388       auto data = std::move(queue_.front());
389       result = data.first;
390       RequestContext::setContext(data.second);
391
392       queue_.pop_front();
393     } catch (...) {
394       // Handle an exception if the assignment operator happens to throw.
395       // We consumed an event but weren't able to pop the message off the
396       // queue.  Signal the event again since the message is still in the
397       // queue.
398       signalEvent(1);
399       throw;
400     }
401
402     return true;
403   }
404
405   int size() {
406     folly::SpinLockGuard g(spinlock_);
407     return queue_.size();
408   }
409
410   /**
411    * Check that the NotificationQueue is being used from the correct process.
412    *
413    * If you create a NotificationQueue in one process, then fork, and try to
414    * send messages to the queue from the child process, you're going to have a
415    * bad time.  Unfortunately users have (accidentally) run into this.
416    *
417    * Because we use an eventfd/pipe, the child process can actually signal the
418    * parent process that an event is ready.  However, it can't put anything on
419    * the parent's queue, so the parent wakes up and finds an empty queue.  This
420    * check ensures that we catch the problem in the misbehaving child process
421    * code, and crash before signalling the parent process.
422    */
423   void checkPid() const {
424     CHECK_EQ(pid_, getpid());
425   }
426
427  private:
428   // Forbidden copy constructor and assignment operator
429   NotificationQueue(NotificationQueue const &) = delete;
430   NotificationQueue& operator=(NotificationQueue const &) = delete;
431
432   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
433     DCHECK(0 == spinlock_.trylock());
434     if (maxSize > 0 && queue_.size() >= maxSize) {
435       if (throws) {
436         throw std::overflow_error("unable to add message to NotificationQueue: "
437                                   "queue is full");
438       }
439       return false;
440     }
441     return true;
442   }
443
444   inline bool checkDraining(bool throws=true) {
445     if (UNLIKELY(draining_ && throws)) {
446       throw std::runtime_error("queue is draining, cannot add message");
447     }
448     return draining_;
449   }
450
451   inline void signalEvent(size_t numAdded = 1) const {
452     static const uint8_t kPipeMessage[] = {
453       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
454     };
455
456     ssize_t bytes_written = 0;
457     ssize_t bytes_expected = 0;
458     if (eventfd_ >= 0) {
459       // eventfd(2) dictates that we must write a 64-bit integer
460       uint64_t numAdded64(numAdded);
461       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
462       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
463     } else {
464       // pipe semantics, add one message for each numAdded
465       bytes_expected = numAdded;
466       do {
467         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
468         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
469         if (rc < 0) {
470           // TODO: if the pipe is full, write will fail with EAGAIN.
471           // See task #1044651 for how this could be handled
472           break;
473         }
474         numAdded -= rc;
475         bytes_written += rc;
476       } while (numAdded > 0);
477     }
478     if (bytes_written != bytes_expected) {
479       folly::throwSystemError("failed to signal NotificationQueue after "
480                               "write", errno);
481     }
482   }
483
484   bool tryConsumeEvent() {
485     uint64_t value = 0;
486     ssize_t rc = -1;
487     if (eventfd_ >= 0) {
488       rc = ::read(eventfd_, &value, sizeof(value));
489     } else {
490       uint8_t value8;
491       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
492       value = value8;
493     }
494     if (rc < 0) {
495       // EAGAIN should pretty much be the only error we can ever get.
496       // This means someone else already processed the only available message.
497       assert(errno == EAGAIN);
498       return false;
499     }
500     assert(value == 1);
501     return true;
502   }
503
504   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
505     checkPid();
506     bool signal = false;
507     {
508       folly::SpinLockGuard g(spinlock_);
509       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
510         return false;
511       }
512       // We only need to signal an event if not all consumers are
513       // awake.
514       if (numActiveConsumers_ < numConsumers_) {
515         signal = true;
516       }
517       queue_.emplace_back(std::move(message), RequestContext::saveContext());
518     }
519     if (signal) {
520       signalEvent();
521     }
522     return true;
523   }
524
525   bool putMessageImpl(
526     const MessageT& message, size_t maxSize, bool throws=true) {
527     checkPid();
528     bool signal = false;
529     {
530       folly::SpinLockGuard g(spinlock_);
531       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
532         return false;
533       }
534       if (numActiveConsumers_ < numConsumers_) {
535         signal = true;
536       }
537       queue_.emplace_back(message, RequestContext::saveContext());
538     }
539     if (signal) {
540       signalEvent();
541     }
542     return true;
543   }
544
545   template<typename InputIteratorT>
546   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
547                        std::input_iterator_tag) {
548     checkPid();
549     bool signal = false;
550     size_t numAdded = 0;
551     {
552       folly::SpinLockGuard g(spinlock_);
553       checkDraining();
554       while (first != last) {
555         queue_.emplace_back(*first, RequestContext::saveContext());
556         ++first;
557         ++numAdded;
558       }
559       if (numActiveConsumers_ < numConsumers_) {
560         signal = true;
561       }
562     }
563     if (signal) {
564       signalEvent();
565     }
566   }
567
568   mutable folly::SpinLock spinlock_;
569   int eventfd_;
570   int pipeFds_[2]; // to fallback to on older/non-linux systems
571   uint32_t advisoryMaxQueueSize_;
572   pid_t pid_;
573   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
574   int numConsumers_{0};
575   std::atomic<int> numActiveConsumers_{0};
576   bool draining_{false};
577 };
578
579 template<typename MessageT>
580 NotificationQueue<MessageT>::Consumer::~Consumer() {
581   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
582   // will be non-nullptr.  Mark the value that it points to, so that
583   // handlerReady() will know the callback is destroyed, and that it cannot
584   // access any member variables anymore.
585   if (destroyedFlagPtr_) {
586     *destroyedFlagPtr_ = true;
587   }
588 }
589
590 template<typename MessageT>
591 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
592     noexcept {
593   consumeMessages(false);
594 }
595
596 template<typename MessageT>
597 void NotificationQueue<MessageT>::Consumer::consumeMessages(
598     bool isDrain, size_t* numConsumed) noexcept {
599   uint32_t numProcessed = 0;
600   bool firstRun = true;
601   setActive(true);
602   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
603   SCOPE_EXIT {
604     if (numConsumed != nullptr) {
605       *numConsumed = numProcessed;
606     }
607   };
608   while (true) {
609     // Try to decrement the eventfd.
610     //
611     // The eventfd is only used to wake up the consumer - there may or
612     // may not actually be an event available (another consumer may
613     // have read it).  We don't really care, we only care about
614     // emptying the queue.
615     if (!isDrain && firstRun) {
616       queue_->tryConsumeEvent();
617       firstRun = false;
618     }
619
620     // Now pop the message off of the queue.
621     //
622     // We have to manually acquire and release the spinlock here, rather than
623     // using SpinLockHolder since the MessageT has to be constructed while
624     // holding the spinlock and available after we release it.  SpinLockHolder
625     // unfortunately doesn't provide a release() method.  (We can't construct
626     // MessageT first since we have no guarantee that MessageT has a default
627     // constructor.
628     queue_->spinlock_.lock();
629     bool locked = true;
630
631     try {
632       if (UNLIKELY(queue_->queue_.empty())) {
633         // If there is no message, we've reached the end of the queue, return.
634         setActive(false);
635         queue_->spinlock_.unlock();
636         return;
637       }
638
639       // Pull a message off the queue.
640       auto& data = queue_->queue_.front();
641
642       MessageT msg(std::move(data.first));
643       auto old_ctx =
644         RequestContext::setContext(data.second);
645       queue_->queue_.pop_front();
646
647       // Check to see if the queue is empty now.
648       // We use this as an optimization to see if we should bother trying to
649       // loop again and read another message after invoking this callback.
650       bool wasEmpty = queue_->queue_.empty();
651       if (wasEmpty) {
652         setActive(false);
653       }
654
655       // Now unlock the spinlock before we invoke the callback.
656       queue_->spinlock_.unlock();
657       locked = false;
658
659       // Call the callback
660       bool callbackDestroyed = false;
661       CHECK(destroyedFlagPtr_ == nullptr);
662       destroyedFlagPtr_ = &callbackDestroyed;
663       messageAvailable(std::move(msg));
664
665       RequestContext::setContext(old_ctx);
666
667       // If the callback was destroyed before it returned, we are done
668       if (callbackDestroyed) {
669         return;
670       }
671       destroyedFlagPtr_ = nullptr;
672
673       // If the callback is no longer installed, we are done.
674       if (queue_ == nullptr) {
675         return;
676       }
677
678       // If we have hit maxReadAtOnce_, we are done.
679       ++numProcessed;
680       if (!isDrain && maxReadAtOnce_ > 0 &&
681           numProcessed >= maxReadAtOnce_) {
682         queue_->signalEvent(1);
683         return;
684       }
685
686       // If the queue was empty before we invoked the callback, it's probable
687       // that it is still empty now.  Just go ahead and return, rather than
688       // looping again and trying to re-read from the eventfd.  (If a new
689       // message had in fact arrived while we were invoking the callback, we
690       // will simply be woken up the next time around the event loop and will
691       // process the message then.)
692       if (wasEmpty) {
693         return;
694       }
695     } catch (const std::exception& ex) {
696       // This catch block is really just to handle the case where the MessageT
697       // constructor throws.  The messageAvailable() callback itself is
698       // declared as noexcept and should never throw.
699       //
700       // If the MessageT constructor does throw we try to handle it as best as
701       // we can, but we can't work miracles.  We will just ignore the error for
702       // now and return.  The next time around the event loop we will end up
703       // trying to read the message again.  If MessageT continues to throw we
704       // will never make forward progress and will keep trying each time around
705       // the event loop.
706       if (locked) {
707         // Unlock the spinlock.
708         queue_->spinlock_.unlock();
709
710         // Push a notification back on the eventfd since we didn't actually
711         // read the message off of the queue.
712         if (!isDrain) {
713           queue_->signalEvent(1);
714         }
715       }
716
717       return;
718     }
719   }
720 }
721
722 template<typename MessageT>
723 void NotificationQueue<MessageT>::Consumer::init(
724     EventBase* eventBase,
725     NotificationQueue* queue) {
726   assert(eventBase->isInEventBaseThread());
727   assert(queue_ == nullptr);
728   assert(!isHandlerRegistered());
729   queue->checkPid();
730
731   base_ = eventBase;
732
733   queue_ = queue;
734
735   {
736     folly::SpinLockGuard g(queue_->spinlock_);
737     queue_->numConsumers_++;
738   }
739   queue_->signalEvent();
740
741   if (queue_->eventfd_ >= 0) {
742     initHandler(eventBase, queue_->eventfd_);
743   } else {
744     initHandler(eventBase, queue_->pipeFds_[0]);
745   }
746 }
747
748 template<typename MessageT>
749 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
750   if (queue_ == nullptr) {
751     assert(!isHandlerRegistered());
752     return;
753   }
754
755   {
756     folly::SpinLockGuard g(queue_->spinlock_);
757     queue_->numConsumers_--;
758     setActive(false);
759   }
760
761   assert(isHandlerRegistered());
762   unregisterHandler();
763   detachEventBase();
764   queue_ = nullptr;
765 }
766
767 template<typename MessageT>
768 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
769     size_t* numConsumed) noexcept {
770   {
771     folly::SpinLockGuard g(queue_->spinlock_);
772     if (queue_->draining_) {
773       return false;
774     }
775     queue_->draining_ = true;
776   }
777   consumeMessages(true, numConsumed);
778   {
779     folly::SpinLockGuard g(queue_->spinlock_);
780     queue_->draining_ = false;
781   }
782   return true;
783 }
784
785 } // folly