Fix ASAN failure in folly/io/async/test/NotificationQueueTest.cpp.
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/DelayedDestruction.h>
25 #include <folly/io/async/Request.h>
26 #include <folly/Likely.h>
27 #include <folly/ScopeGuard.h>
28 #include <folly/SpinLock.h>
29
30 #include <glog/logging.h>
31 #include <deque>
32
33 #if __linux__ && !__ANDROID__
34 #define FOLLY_HAVE_EVENTFD
35 #include <folly/io/async/EventFDWrapper.h>
36 #endif
37
38 namespace folly {
39
40 /**
41  * A producer-consumer queue for passing messages between EventBase threads.
42  *
43  * Messages can be added to the queue from any thread.  Multiple consumers may
44  * listen to the queue from multiple EventBase threads.
45  *
46  * A NotificationQueue may not be destroyed while there are still consumers
47  * registered to receive events from the queue.  It is the user's
48  * responsibility to ensure that all consumers are unregistered before the
49  * queue is destroyed.
50  *
51  * MessageT should be MoveConstructible (i.e., must support either a move
52  * constructor or a copy constructor, or both).  Ideally it's move constructor
53  * (or copy constructor if no move constructor is provided) should never throw
54  * exceptions.  If the constructor may throw, the consumers could end up
55  * spinning trying to move a message off the queue and failing, and then
56  * retrying.
57  */
58 template<typename MessageT>
59 class NotificationQueue {
60  public:
61   /**
62    * A callback interface for consuming messages from the queue as they arrive.
63    */
64   class Consumer : public DelayedDestruction, private EventHandler {
65    public:
66     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
67
68     Consumer()
69       : queue_(nullptr),
70         destroyedFlagPtr_(nullptr),
71         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
72
73     /**
74      * messageAvailable() will be invoked whenever a new
75      * message is available from the pipe.
76      */
77     virtual void messageAvailable(MessageT&& message) = 0;
78
79     /**
80      * Begin consuming messages from the specified queue.
81      *
82      * messageAvailable() will be called whenever a message is available.  This
83      * consumer will continue to consume messages until stopConsuming() is
84      * called.
85      *
86      * A Consumer may only consume messages from a single NotificationQueue at
87      * a time.  startConsuming() should not be called if this consumer is
88      * already consuming.
89      */
90     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
91       init(eventBase, queue);
92       registerHandler(READ | PERSIST);
93     }
94
95     /**
96      * Same as above but registers this event handler as internal so that it
97      * doesn't count towards the pending reader count for the IOLoop.
98      */
99     void startConsumingInternal(
100         EventBase* eventBase, NotificationQueue* queue) {
101       init(eventBase, queue);
102       registerInternalHandler(READ | PERSIST);
103     }
104
105     /**
106      * Stop consuming messages.
107      *
108      * startConsuming() may be called again to resume consumption of messages
109      * at a later point in time.
110      */
111     void stopConsuming();
112
113     /**
114      * Consume messages off the queue until it is empty. No messages may be
115      * added to the queue while it is draining, so that the process is bounded.
116      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
117      * and tryPutMessageNoThrow will return false.
118      *
119      * @returns true if the queue was drained, false otherwise. In practice,
120      * this will only fail if someone else is already draining the queue.
121      */
122     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
123
124     /**
125      * Get the NotificationQueue that this consumer is currently consuming
126      * messages from.  Returns nullptr if the consumer is not currently
127      * consuming events from any queue.
128      */
129     NotificationQueue* getCurrentQueue() const {
130       return queue_;
131     }
132
133     /**
134      * Set a limit on how many messages this consumer will read each iteration
135      * around the event loop.
136      *
137      * This helps rate-limit how much work the Consumer will do each event loop
138      * iteration, to prevent it from starving other event handlers.
139      *
140      * A limit of 0 means no limit will be enforced.  If unset, the limit
141      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
142      */
143     void setMaxReadAtOnce(uint32_t maxAtOnce) {
144       maxReadAtOnce_ = maxAtOnce;
145     }
146     uint32_t getMaxReadAtOnce() const {
147       return maxReadAtOnce_;
148     }
149
150     EventBase* getEventBase() {
151       return base_;
152     }
153
154     void handlerReady(uint16_t events) noexcept override;
155
156    protected:
157
158     void destroy() override;
159
160     virtual ~Consumer() {}
161
162    private:
163     /**
164      * Consume messages off the the queue until
165      *   - the queue is empty (1), or
166      *   - until the consumer is destroyed, or
167      *   - until the consumer is uninstalled, or
168      *   - an exception is thrown in the course of dequeueing, or
169      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
170      *
171      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
172      */
173     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
174
175     void setActive(bool active, bool shouldLock = false) {
176       if (!queue_) {
177         active_ = active;
178         return;
179       }
180       if (shouldLock) {
181         queue_->spinlock_.lock();
182       }
183       if (!active_ && active) {
184         ++queue_->numActiveConsumers_;
185       } else if (active_ && !active) {
186         --queue_->numActiveConsumers_;
187       }
188       active_ = active;
189       if (shouldLock) {
190         queue_->spinlock_.unlock();
191       }
192     }
193     void init(EventBase* eventBase, NotificationQueue* queue);
194
195     NotificationQueue* queue_;
196     bool* destroyedFlagPtr_;
197     uint32_t maxReadAtOnce_;
198     EventBase* base_;
199     bool active_{false};
200   };
201
202   enum class FdType {
203     PIPE,
204 #ifdef FOLLY_HAVE_EVENTFD
205     EVENTFD,
206 #endif
207   };
208
209   /**
210    * Create a new NotificationQueue.
211    *
212    * If the maxSize parameter is specified, this sets the maximum queue size
213    * that will be enforced by tryPutMessage().  (This size is advisory, and may
214    * be exceeded if producers explicitly use putMessage() instead of
215    * tryPutMessage().)
216    *
217    * The fdType parameter determines the type of file descriptor used
218    * internally to signal message availability.  The default (eventfd) is
219    * preferable for performance and because it won't fail when the queue gets
220    * too long.  It is not available on on older and non-linux kernels, however.
221    * In this case the code will fall back to using a pipe, the parameter is
222    * mostly for testing purposes.
223    */
224   explicit NotificationQueue(uint32_t maxSize = 0,
225 #ifdef FOLLY_HAVE_EVENTFD
226                              FdType fdType = FdType::EVENTFD)
227 #else
228                              FdType fdType = FdType::PIPE)
229 #endif
230     : eventfd_(-1),
231       pipeFds_{-1, -1},
232       advisoryMaxQueueSize_(maxSize),
233       pid_(getpid()),
234       queue_() {
235
236     RequestContext::saveContext();
237
238 #ifdef FOLLY_HAVE_EVENTFD
239     if (fdType == FdType::EVENTFD) {
240       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
241       if (eventfd_ == -1) {
242         if (errno == ENOSYS || errno == EINVAL) {
243           // eventfd not availalble
244           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
245                      << errno << ", falling back to pipe mode (is your kernel "
246                      << "> 2.6.30?)";
247           fdType = FdType::PIPE;
248         } else {
249           // some other error
250           folly::throwSystemError("Failed to create eventfd for "
251                                   "NotificationQueue", errno);
252         }
253       }
254     }
255 #endif
256     if (fdType == FdType::PIPE) {
257       if (pipe(pipeFds_)) {
258         folly::throwSystemError("Failed to create pipe for NotificationQueue",
259                                 errno);
260       }
261       try {
262         // put both ends of the pipe into non-blocking mode
263         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
264           folly::throwSystemError("failed to put NotificationQueue pipe read "
265                                   "endpoint into non-blocking mode", errno);
266         }
267         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
268           folly::throwSystemError("failed to put NotificationQueue pipe write "
269                                   "endpoint into non-blocking mode", errno);
270         }
271       } catch (...) {
272         ::close(pipeFds_[0]);
273         ::close(pipeFds_[1]);
274         throw;
275       }
276     }
277   }
278
279   ~NotificationQueue() {
280     if (eventfd_ >= 0) {
281       ::close(eventfd_);
282       eventfd_ = -1;
283     }
284     if (pipeFds_[0] >= 0) {
285       ::close(pipeFds_[0]);
286       pipeFds_[0] = -1;
287     }
288     if (pipeFds_[1] >= 0) {
289       ::close(pipeFds_[1]);
290       pipeFds_[1] = -1;
291     }
292   }
293
294   /**
295    * Set the advisory maximum queue size.
296    *
297    * This maximum queue size affects calls to tryPutMessage().  Message
298    * producers can still use the putMessage() call to unconditionally put a
299    * message on the queue, ignoring the configured maximum queue size.  This
300    * can cause the queue size to exceed the configured maximum.
301    */
302   void setMaxQueueSize(uint32_t max) {
303     advisoryMaxQueueSize_ = max;
304   }
305
306   /**
307    * Attempt to put a message on the queue if the queue is not already full.
308    *
309    * If the queue is full, a std::overflow_error will be thrown.  The
310    * setMaxQueueSize() function controls the maximum queue size.
311    *
312    * If the queue is currently draining, an std::runtime_error will be thrown.
313    *
314    * This method may contend briefly on a spinlock if many threads are
315    * concurrently accessing the queue, but for all intents and purposes it will
316    * immediately place the message on the queue and return.
317    *
318    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
319    * may throw any other exception thrown by the MessageT move/copy
320    * constructor.
321    */
322   void tryPutMessage(MessageT&& message) {
323     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
324   }
325   void tryPutMessage(const MessageT& message) {
326     putMessageImpl(message, advisoryMaxQueueSize_);
327   }
328
329   /**
330    * No-throw versions of the above.  Instead returns true on success, false on
331    * failure.
332    *
333    * Only std::overflow_error (the common exception case) and std::runtime_error
334    * (which indicates that the queue is being drained) are prevented from being
335    * thrown. User code must still catch std::bad_alloc errors.
336    */
337   bool tryPutMessageNoThrow(MessageT&& message) {
338     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
339   }
340   bool tryPutMessageNoThrow(const MessageT& message) {
341     return putMessageImpl(message, advisoryMaxQueueSize_, false);
342   }
343
344   /**
345    * Unconditionally put a message on the queue.
346    *
347    * This method is like tryPutMessage(), but ignores the maximum queue size
348    * and always puts the message on the queue, even if the maximum queue size
349    * would be exceeded.
350    *
351    * putMessage() may throw
352    *   - std::bad_alloc if memory allocation fails, and may
353    *   - std::runtime_error if the queue is currently draining
354    *   - any other exception thrown by the MessageT move/copy constructor.
355    */
356   void putMessage(MessageT&& message) {
357     putMessageImpl(std::move(message), 0);
358   }
359   void putMessage(const MessageT& message) {
360     putMessageImpl(message, 0);
361   }
362
363   /**
364    * Put several messages on the queue.
365    */
366   template<typename InputIteratorT>
367   void putMessages(InputIteratorT first, InputIteratorT last) {
368     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
369       IterCategory;
370     putMessagesImpl(first, last, IterCategory());
371   }
372
373   /**
374    * Try to immediately pull a message off of the queue, without blocking.
375    *
376    * If a message is immediately available, the result parameter will be
377    * updated to contain the message contents and true will be returned.
378    *
379    * If no message is available, false will be returned and result will be left
380    * unmodified.
381    */
382   bool tryConsume(MessageT& result) {
383     checkPid();
384
385     try {
386
387       folly::SpinLockGuard g(spinlock_);
388
389       if (UNLIKELY(queue_.empty())) {
390         return false;
391       }
392
393       auto data = std::move(queue_.front());
394       result = data.first;
395       RequestContext::setContext(data.second);
396
397       queue_.pop_front();
398     } catch (...) {
399       // Handle an exception if the assignment operator happens to throw.
400       // We consumed an event but weren't able to pop the message off the
401       // queue.  Signal the event again since the message is still in the
402       // queue.
403       signalEvent(1);
404       throw;
405     }
406
407     return true;
408   }
409
410   int size() {
411     folly::SpinLockGuard g(spinlock_);
412     return queue_.size();
413   }
414
415   /**
416    * Check that the NotificationQueue is being used from the correct process.
417    *
418    * If you create a NotificationQueue in one process, then fork, and try to
419    * send messages to the queue from the child process, you're going to have a
420    * bad time.  Unfortunately users have (accidentally) run into this.
421    *
422    * Because we use an eventfd/pipe, the child process can actually signal the
423    * parent process that an event is ready.  However, it can't put anything on
424    * the parent's queue, so the parent wakes up and finds an empty queue.  This
425    * check ensures that we catch the problem in the misbehaving child process
426    * code, and crash before signalling the parent process.
427    */
428   void checkPid() const {
429     CHECK_EQ(pid_, getpid());
430   }
431
432  private:
433   // Forbidden copy constructor and assignment operator
434   NotificationQueue(NotificationQueue const &) = delete;
435   NotificationQueue& operator=(NotificationQueue const &) = delete;
436
437   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
438     DCHECK(0 == spinlock_.trylock());
439     if (maxSize > 0 && queue_.size() >= maxSize) {
440       if (throws) {
441         throw std::overflow_error("unable to add message to NotificationQueue: "
442                                   "queue is full");
443       }
444       return false;
445     }
446     return true;
447   }
448
449   inline bool checkDraining(bool throws=true) {
450     if (UNLIKELY(draining_ && throws)) {
451       throw std::runtime_error("queue is draining, cannot add message");
452     }
453     return draining_;
454   }
455
456   inline void signalEvent(size_t numAdded = 1) const {
457     static const uint8_t kPipeMessage[] = {
458       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
459     };
460
461     ssize_t bytes_written = 0;
462     ssize_t bytes_expected = 0;
463     if (eventfd_ >= 0) {
464       // eventfd(2) dictates that we must write a 64-bit integer
465       uint64_t numAdded64(numAdded);
466       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
467       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
468     } else {
469       // pipe semantics, add one message for each numAdded
470       bytes_expected = numAdded;
471       do {
472         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
473         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
474         if (rc < 0) {
475           // TODO: if the pipe is full, write will fail with EAGAIN.
476           // See task #1044651 for how this could be handled
477           break;
478         }
479         numAdded -= rc;
480         bytes_written += rc;
481       } while (numAdded > 0);
482     }
483     if (bytes_written != bytes_expected) {
484       folly::throwSystemError("failed to signal NotificationQueue after "
485                               "write", errno);
486     }
487   }
488
489   bool tryConsumeEvent() {
490     uint64_t value = 0;
491     ssize_t rc = -1;
492     if (eventfd_ >= 0) {
493       rc = ::read(eventfd_, &value, sizeof(value));
494     } else {
495       uint8_t value8;
496       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
497       value = value8;
498     }
499     if (rc < 0) {
500       // EAGAIN should pretty much be the only error we can ever get.
501       // This means someone else already processed the only available message.
502       assert(errno == EAGAIN);
503       return false;
504     }
505     assert(value == 1);
506     return true;
507   }
508
509   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
510     checkPid();
511     bool signal = false;
512     {
513       folly::SpinLockGuard g(spinlock_);
514       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
515         return false;
516       }
517       // We only need to signal an event if not all consumers are
518       // awake.
519       if (numActiveConsumers_ < numConsumers_) {
520         signal = true;
521       }
522       queue_.emplace_back(std::move(message), RequestContext::saveContext());
523     }
524     if (signal) {
525       signalEvent();
526     }
527     return true;
528   }
529
530   bool putMessageImpl(
531     const MessageT& message, size_t maxSize, bool throws=true) {
532     checkPid();
533     bool signal = false;
534     {
535       folly::SpinLockGuard g(spinlock_);
536       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
537         return false;
538       }
539       if (numActiveConsumers_ < numConsumers_) {
540         signal = true;
541       }
542       queue_.emplace_back(message, RequestContext::saveContext());
543     }
544     if (signal) {
545       signalEvent();
546     }
547     return true;
548   }
549
550   template<typename InputIteratorT>
551   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
552                        std::input_iterator_tag) {
553     checkPid();
554     bool signal = false;
555     size_t numAdded = 0;
556     {
557       folly::SpinLockGuard g(spinlock_);
558       checkDraining();
559       while (first != last) {
560         queue_.emplace_back(*first, RequestContext::saveContext());
561         ++first;
562         ++numAdded;
563       }
564       if (numActiveConsumers_ < numConsumers_) {
565         signal = true;
566       }
567     }
568     if (signal) {
569       signalEvent();
570     }
571   }
572
573   mutable folly::SpinLock spinlock_;
574   int eventfd_;
575   int pipeFds_[2]; // to fallback to on older/non-linux systems
576   uint32_t advisoryMaxQueueSize_;
577   pid_t pid_;
578   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
579   int numConsumers_{0};
580   std::atomic<int> numActiveConsumers_{0};
581   bool draining_{false};
582 };
583
584 template<typename MessageT>
585 void NotificationQueue<MessageT>::Consumer::destroy() {
586   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
587   // will be non-nullptr.  Mark the value that it points to, so that
588   // handlerReady() will know the callback is destroyed, and that it cannot
589   // access any member variables anymore.
590   if (destroyedFlagPtr_) {
591     *destroyedFlagPtr_ = true;
592   }
593   stopConsuming();
594   DelayedDestruction::destroy();
595 }
596
597 template<typename MessageT>
598 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
599     noexcept {
600   consumeMessages(false);
601 }
602
603 template<typename MessageT>
604 void NotificationQueue<MessageT>::Consumer::consumeMessages(
605     bool isDrain, size_t* numConsumed) noexcept {
606   DestructorGuard dg(this);
607   uint32_t numProcessed = 0;
608   bool firstRun = true;
609   setActive(true);
610   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
611   SCOPE_EXIT {
612     if (numConsumed != nullptr) {
613       *numConsumed = numProcessed;
614     }
615   };
616   while (true) {
617     // Try to decrement the eventfd.
618     //
619     // The eventfd is only used to wake up the consumer - there may or
620     // may not actually be an event available (another consumer may
621     // have read it).  We don't really care, we only care about
622     // emptying the queue.
623     if (!isDrain && firstRun) {
624       queue_->tryConsumeEvent();
625       firstRun = false;
626     }
627
628     // Now pop the message off of the queue.
629     //
630     // We have to manually acquire and release the spinlock here, rather than
631     // using SpinLockHolder since the MessageT has to be constructed while
632     // holding the spinlock and available after we release it.  SpinLockHolder
633     // unfortunately doesn't provide a release() method.  (We can't construct
634     // MessageT first since we have no guarantee that MessageT has a default
635     // constructor.
636     queue_->spinlock_.lock();
637     bool locked = true;
638
639     try {
640       if (UNLIKELY(queue_->queue_.empty())) {
641         // If there is no message, we've reached the end of the queue, return.
642         setActive(false);
643         queue_->spinlock_.unlock();
644         return;
645       }
646
647       // Pull a message off the queue.
648       auto& data = queue_->queue_.front();
649
650       MessageT msg(std::move(data.first));
651       auto old_ctx =
652         RequestContext::setContext(data.second);
653       queue_->queue_.pop_front();
654
655       // Check to see if the queue is empty now.
656       // We use this as an optimization to see if we should bother trying to
657       // loop again and read another message after invoking this callback.
658       bool wasEmpty = queue_->queue_.empty();
659       if (wasEmpty) {
660         setActive(false);
661       }
662
663       // Now unlock the spinlock before we invoke the callback.
664       queue_->spinlock_.unlock();
665       locked = false;
666
667       // Call the callback
668       bool callbackDestroyed = false;
669       CHECK(destroyedFlagPtr_ == nullptr);
670       destroyedFlagPtr_ = &callbackDestroyed;
671       messageAvailable(std::move(msg));
672       destroyedFlagPtr_ = nullptr;
673
674       RequestContext::setContext(old_ctx);
675
676       // If the callback was destroyed before it returned, we are done
677       if (callbackDestroyed) {
678         return;
679       }
680
681       // If the callback is no longer installed, we are done.
682       if (queue_ == nullptr) {
683         return;
684       }
685
686       // If we have hit maxReadAtOnce_, we are done.
687       ++numProcessed;
688       if (!isDrain && maxReadAtOnce_ > 0 &&
689           numProcessed >= maxReadAtOnce_) {
690         queue_->signalEvent(1);
691         return;
692       }
693
694       // If the queue was empty before we invoked the callback, it's probable
695       // that it is still empty now.  Just go ahead and return, rather than
696       // looping again and trying to re-read from the eventfd.  (If a new
697       // message had in fact arrived while we were invoking the callback, we
698       // will simply be woken up the next time around the event loop and will
699       // process the message then.)
700       if (wasEmpty) {
701         return;
702       }
703     } catch (const std::exception& ex) {
704       // This catch block is really just to handle the case where the MessageT
705       // constructor throws.  The messageAvailable() callback itself is
706       // declared as noexcept and should never throw.
707       //
708       // If the MessageT constructor does throw we try to handle it as best as
709       // we can, but we can't work miracles.  We will just ignore the error for
710       // now and return.  The next time around the event loop we will end up
711       // trying to read the message again.  If MessageT continues to throw we
712       // will never make forward progress and will keep trying each time around
713       // the event loop.
714       if (locked) {
715         // Unlock the spinlock.
716         queue_->spinlock_.unlock();
717
718         // Push a notification back on the eventfd since we didn't actually
719         // read the message off of the queue.
720         if (!isDrain) {
721           queue_->signalEvent(1);
722         }
723       }
724
725       return;
726     }
727   }
728 }
729
730 template<typename MessageT>
731 void NotificationQueue<MessageT>::Consumer::init(
732     EventBase* eventBase,
733     NotificationQueue* queue) {
734   assert(eventBase->isInEventBaseThread());
735   assert(queue_ == nullptr);
736   assert(!isHandlerRegistered());
737   queue->checkPid();
738
739   base_ = eventBase;
740
741   queue_ = queue;
742
743   {
744     folly::SpinLockGuard g(queue_->spinlock_);
745     queue_->numConsumers_++;
746   }
747   queue_->signalEvent();
748
749   if (queue_->eventfd_ >= 0) {
750     initHandler(eventBase, queue_->eventfd_);
751   } else {
752     initHandler(eventBase, queue_->pipeFds_[0]);
753   }
754 }
755
756 template<typename MessageT>
757 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
758   if (queue_ == nullptr) {
759     assert(!isHandlerRegistered());
760     return;
761   }
762
763   {
764     folly::SpinLockGuard g(queue_->spinlock_);
765     queue_->numConsumers_--;
766     setActive(false);
767   }
768
769   assert(isHandlerRegistered());
770   unregisterHandler();
771   detachEventBase();
772   queue_ = nullptr;
773 }
774
775 template<typename MessageT>
776 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
777     size_t* numConsumed) noexcept {
778   DestructorGuard dg(this);
779   {
780     folly::SpinLockGuard g(queue_->spinlock_);
781     if (queue_->draining_) {
782       return false;
783     }
784     queue_->draining_ = true;
785   }
786   consumeMessages(true, numConsumed);
787   {
788     folly::SpinLockGuard g(queue_->spinlock_);
789     queue_->draining_ = false;
790   }
791   return true;
792 }
793
794 } // folly