Change NotificationQueue assert -> CHECK
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22
23 #include <algorithm>
24 #include <deque>
25 #include <iterator>
26 #include <memory>
27 #include <stdexcept>
28 #include <utility>
29
30 #include <folly/io/async/EventBase.h>
31 #include <folly/io/async/EventHandler.h>
32 #include <folly/io/async/DelayedDestruction.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/Likely.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/SpinLock.h>
37
38 #include <glog/logging.h>
39
40 #if __linux__ && !__ANDROID__
41 #define FOLLY_HAVE_EVENTFD
42 #include <folly/io/async/EventFDWrapper.h>
43 #endif
44
45 namespace folly {
46
47 /**
48  * A producer-consumer queue for passing messages between EventBase threads.
49  *
50  * Messages can be added to the queue from any thread.  Multiple consumers may
51  * listen to the queue from multiple EventBase threads.
52  *
53  * A NotificationQueue may not be destroyed while there are still consumers
54  * registered to receive events from the queue.  It is the user's
55  * responsibility to ensure that all consumers are unregistered before the
56  * queue is destroyed.
57  *
58  * MessageT should be MoveConstructible (i.e., must support either a move
59  * constructor or a copy constructor, or both).  Ideally it's move constructor
60  * (or copy constructor if no move constructor is provided) should never throw
61  * exceptions.  If the constructor may throw, the consumers could end up
62  * spinning trying to move a message off the queue and failing, and then
63  * retrying.
64  */
65 template<typename MessageT>
66 class NotificationQueue {
67  public:
68   /**
69    * A callback interface for consuming messages from the queue as they arrive.
70    */
71   class Consumer : public DelayedDestruction, private EventHandler {
72    public:
73     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
74
75     Consumer()
76       : queue_(nullptr),
77         destroyedFlagPtr_(nullptr),
78         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
79
80     /**
81      * messageAvailable() will be invoked whenever a new
82      * message is available from the pipe.
83      */
84     virtual void messageAvailable(MessageT&& message) = 0;
85
86     /**
87      * Begin consuming messages from the specified queue.
88      *
89      * messageAvailable() will be called whenever a message is available.  This
90      * consumer will continue to consume messages until stopConsuming() is
91      * called.
92      *
93      * A Consumer may only consume messages from a single NotificationQueue at
94      * a time.  startConsuming() should not be called if this consumer is
95      * already consuming.
96      */
97     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
98       init(eventBase, queue);
99       registerHandler(READ | PERSIST);
100     }
101
102     /**
103      * Same as above but registers this event handler as internal so that it
104      * doesn't count towards the pending reader count for the IOLoop.
105      */
106     void startConsumingInternal(
107         EventBase* eventBase, NotificationQueue* queue) {
108       init(eventBase, queue);
109       registerInternalHandler(READ | PERSIST);
110     }
111
112     /**
113      * Stop consuming messages.
114      *
115      * startConsuming() may be called again to resume consumption of messages
116      * at a later point in time.
117      */
118     void stopConsuming();
119
120     /**
121      * Consume messages off the queue until it is empty. No messages may be
122      * added to the queue while it is draining, so that the process is bounded.
123      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
124      * and tryPutMessageNoThrow will return false.
125      *
126      * @returns true if the queue was drained, false otherwise. In practice,
127      * this will only fail if someone else is already draining the queue.
128      */
129     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
130
131     /**
132      * Get the NotificationQueue that this consumer is currently consuming
133      * messages from.  Returns nullptr if the consumer is not currently
134      * consuming events from any queue.
135      */
136     NotificationQueue* getCurrentQueue() const {
137       return queue_;
138     }
139
140     /**
141      * Set a limit on how many messages this consumer will read each iteration
142      * around the event loop.
143      *
144      * This helps rate-limit how much work the Consumer will do each event loop
145      * iteration, to prevent it from starving other event handlers.
146      *
147      * A limit of 0 means no limit will be enforced.  If unset, the limit
148      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
149      */
150     void setMaxReadAtOnce(uint32_t maxAtOnce) {
151       maxReadAtOnce_ = maxAtOnce;
152     }
153     uint32_t getMaxReadAtOnce() const {
154       return maxReadAtOnce_;
155     }
156
157     EventBase* getEventBase() {
158       return base_;
159     }
160
161     void handlerReady(uint16_t events) noexcept override;
162
163    protected:
164
165     void destroy() override;
166
167     virtual ~Consumer() {}
168
169    private:
170     /**
171      * Consume messages off the the queue until
172      *   - the queue is empty (1), or
173      *   - until the consumer is destroyed, or
174      *   - until the consumer is uninstalled, or
175      *   - an exception is thrown in the course of dequeueing, or
176      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
177      *
178      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
179      */
180     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
181
182     void setActive(bool active, bool shouldLock = false) {
183       if (!queue_) {
184         active_ = active;
185         return;
186       }
187       if (shouldLock) {
188         queue_->spinlock_.lock();
189       }
190       if (!active_ && active) {
191         ++queue_->numActiveConsumers_;
192       } else if (active_ && !active) {
193         --queue_->numActiveConsumers_;
194       }
195       active_ = active;
196       if (shouldLock) {
197         queue_->spinlock_.unlock();
198       }
199     }
200     void init(EventBase* eventBase, NotificationQueue* queue);
201
202     NotificationQueue* queue_;
203     bool* destroyedFlagPtr_;
204     uint32_t maxReadAtOnce_;
205     EventBase* base_;
206     bool active_{false};
207   };
208
209   enum class FdType {
210     PIPE,
211 #ifdef FOLLY_HAVE_EVENTFD
212     EVENTFD,
213 #endif
214   };
215
216   /**
217    * Create a new NotificationQueue.
218    *
219    * If the maxSize parameter is specified, this sets the maximum queue size
220    * that will be enforced by tryPutMessage().  (This size is advisory, and may
221    * be exceeded if producers explicitly use putMessage() instead of
222    * tryPutMessage().)
223    *
224    * The fdType parameter determines the type of file descriptor used
225    * internally to signal message availability.  The default (eventfd) is
226    * preferable for performance and because it won't fail when the queue gets
227    * too long.  It is not available on on older and non-linux kernels, however.
228    * In this case the code will fall back to using a pipe, the parameter is
229    * mostly for testing purposes.
230    */
231   explicit NotificationQueue(uint32_t maxSize = 0,
232 #ifdef FOLLY_HAVE_EVENTFD
233                              FdType fdType = FdType::EVENTFD)
234 #else
235                              FdType fdType = FdType::PIPE)
236 #endif
237     : eventfd_(-1),
238       pipeFds_{-1, -1},
239       advisoryMaxQueueSize_(maxSize),
240       pid_(getpid()),
241       queue_() {
242
243     RequestContext::saveContext();
244
245 #ifdef FOLLY_HAVE_EVENTFD
246     if (fdType == FdType::EVENTFD) {
247       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
248       if (eventfd_ == -1) {
249         if (errno == ENOSYS || errno == EINVAL) {
250           // eventfd not availalble
251           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
252                      << errno << ", falling back to pipe mode (is your kernel "
253                      << "> 2.6.30?)";
254           fdType = FdType::PIPE;
255         } else {
256           // some other error
257           folly::throwSystemError("Failed to create eventfd for "
258                                   "NotificationQueue", errno);
259         }
260       }
261     }
262 #endif
263     if (fdType == FdType::PIPE) {
264       if (pipe(pipeFds_)) {
265         folly::throwSystemError("Failed to create pipe for NotificationQueue",
266                                 errno);
267       }
268       try {
269         // put both ends of the pipe into non-blocking mode
270         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
271           folly::throwSystemError("failed to put NotificationQueue pipe read "
272                                   "endpoint into non-blocking mode", errno);
273         }
274         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
275           folly::throwSystemError("failed to put NotificationQueue pipe write "
276                                   "endpoint into non-blocking mode", errno);
277         }
278       } catch (...) {
279         ::close(pipeFds_[0]);
280         ::close(pipeFds_[1]);
281         throw;
282       }
283     }
284   }
285
286   ~NotificationQueue() {
287     if (eventfd_ >= 0) {
288       ::close(eventfd_);
289       eventfd_ = -1;
290     }
291     if (pipeFds_[0] >= 0) {
292       ::close(pipeFds_[0]);
293       pipeFds_[0] = -1;
294     }
295     if (pipeFds_[1] >= 0) {
296       ::close(pipeFds_[1]);
297       pipeFds_[1] = -1;
298     }
299   }
300
301   /**
302    * Set the advisory maximum queue size.
303    *
304    * This maximum queue size affects calls to tryPutMessage().  Message
305    * producers can still use the putMessage() call to unconditionally put a
306    * message on the queue, ignoring the configured maximum queue size.  This
307    * can cause the queue size to exceed the configured maximum.
308    */
309   void setMaxQueueSize(uint32_t max) {
310     advisoryMaxQueueSize_ = max;
311   }
312
313   /**
314    * Attempt to put a message on the queue if the queue is not already full.
315    *
316    * If the queue is full, a std::overflow_error will be thrown.  The
317    * setMaxQueueSize() function controls the maximum queue size.
318    *
319    * If the queue is currently draining, an std::runtime_error will be thrown.
320    *
321    * This method may contend briefly on a spinlock if many threads are
322    * concurrently accessing the queue, but for all intents and purposes it will
323    * immediately place the message on the queue and return.
324    *
325    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
326    * may throw any other exception thrown by the MessageT move/copy
327    * constructor.
328    */
329   void tryPutMessage(MessageT&& message) {
330     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
331   }
332   void tryPutMessage(const MessageT& message) {
333     putMessageImpl(message, advisoryMaxQueueSize_);
334   }
335
336   /**
337    * No-throw versions of the above.  Instead returns true on success, false on
338    * failure.
339    *
340    * Only std::overflow_error (the common exception case) and std::runtime_error
341    * (which indicates that the queue is being drained) are prevented from being
342    * thrown. User code must still catch std::bad_alloc errors.
343    */
344   bool tryPutMessageNoThrow(MessageT&& message) {
345     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
346   }
347   bool tryPutMessageNoThrow(const MessageT& message) {
348     return putMessageImpl(message, advisoryMaxQueueSize_, false);
349   }
350
351   /**
352    * Unconditionally put a message on the queue.
353    *
354    * This method is like tryPutMessage(), but ignores the maximum queue size
355    * and always puts the message on the queue, even if the maximum queue size
356    * would be exceeded.
357    *
358    * putMessage() may throw
359    *   - std::bad_alloc if memory allocation fails, and may
360    *   - std::runtime_error if the queue is currently draining
361    *   - any other exception thrown by the MessageT move/copy constructor.
362    */
363   void putMessage(MessageT&& message) {
364     putMessageImpl(std::move(message), 0);
365   }
366   void putMessage(const MessageT& message) {
367     putMessageImpl(message, 0);
368   }
369
370   /**
371    * Put several messages on the queue.
372    */
373   template<typename InputIteratorT>
374   void putMessages(InputIteratorT first, InputIteratorT last) {
375     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
376       IterCategory;
377     putMessagesImpl(first, last, IterCategory());
378   }
379
380   /**
381    * Try to immediately pull a message off of the queue, without blocking.
382    *
383    * If a message is immediately available, the result parameter will be
384    * updated to contain the message contents and true will be returned.
385    *
386    * If no message is available, false will be returned and result will be left
387    * unmodified.
388    */
389   bool tryConsume(MessageT& result) {
390     checkPid();
391
392     try {
393
394       folly::SpinLockGuard g(spinlock_);
395
396       if (UNLIKELY(queue_.empty())) {
397         return false;
398       }
399
400       auto data = std::move(queue_.front());
401       result = data.first;
402       RequestContext::setContext(data.second);
403
404       queue_.pop_front();
405     } catch (...) {
406       // Handle an exception if the assignment operator happens to throw.
407       // We consumed an event but weren't able to pop the message off the
408       // queue.  Signal the event again since the message is still in the
409       // queue.
410       signalEvent(1);
411       throw;
412     }
413
414     return true;
415   }
416
417   int size() {
418     folly::SpinLockGuard g(spinlock_);
419     return queue_.size();
420   }
421
422   /**
423    * Check that the NotificationQueue is being used from the correct process.
424    *
425    * If you create a NotificationQueue in one process, then fork, and try to
426    * send messages to the queue from the child process, you're going to have a
427    * bad time.  Unfortunately users have (accidentally) run into this.
428    *
429    * Because we use an eventfd/pipe, the child process can actually signal the
430    * parent process that an event is ready.  However, it can't put anything on
431    * the parent's queue, so the parent wakes up and finds an empty queue.  This
432    * check ensures that we catch the problem in the misbehaving child process
433    * code, and crash before signalling the parent process.
434    */
435   void checkPid() const {
436     CHECK_EQ(pid_, getpid());
437   }
438
439  private:
440   // Forbidden copy constructor and assignment operator
441   NotificationQueue(NotificationQueue const &) = delete;
442   NotificationQueue& operator=(NotificationQueue const &) = delete;
443
444   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
445     DCHECK(0 == spinlock_.trylock());
446     if (maxSize > 0 && queue_.size() >= maxSize) {
447       if (throws) {
448         throw std::overflow_error("unable to add message to NotificationQueue: "
449                                   "queue is full");
450       }
451       return false;
452     }
453     return true;
454   }
455
456   inline bool checkDraining(bool throws=true) {
457     if (UNLIKELY(draining_ && throws)) {
458       throw std::runtime_error("queue is draining, cannot add message");
459     }
460     return draining_;
461   }
462
463   inline void signalEvent(size_t numAdded = 1) const {
464     static const uint8_t kPipeMessage[] = {
465       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
466     };
467
468     ssize_t bytes_written = 0;
469     ssize_t bytes_expected = 0;
470     if (eventfd_ >= 0) {
471       // eventfd(2) dictates that we must write a 64-bit integer
472       uint64_t numAdded64(numAdded);
473       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
474       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
475     } else {
476       // pipe semantics, add one message for each numAdded
477       bytes_expected = numAdded;
478       do {
479         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
480         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
481         if (rc < 0) {
482           // TODO: if the pipe is full, write will fail with EAGAIN.
483           // See task #1044651 for how this could be handled
484           break;
485         }
486         numAdded -= rc;
487         bytes_written += rc;
488       } while (numAdded > 0);
489     }
490     if (bytes_written != bytes_expected) {
491       folly::throwSystemError("failed to signal NotificationQueue after "
492                               "write", errno);
493     }
494   }
495
496   bool tryConsumeEvent() {
497     uint64_t value = 0;
498     ssize_t rc = -1;
499     if (eventfd_ >= 0) {
500       rc = ::read(eventfd_, &value, sizeof(value));
501     } else {
502       uint8_t value8;
503       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
504       value = value8;
505     }
506     if (rc < 0) {
507       // EAGAIN should pretty much be the only error we can ever get.
508       // This means someone else already processed the only available message.
509       CHECK_EQ(errno, EAGAIN);
510       return false;
511     }
512     assert(value == 1);
513     return true;
514   }
515
516   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
517     checkPid();
518     bool signal = false;
519     {
520       folly::SpinLockGuard g(spinlock_);
521       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
522         return false;
523       }
524       // We only need to signal an event if not all consumers are
525       // awake.
526       if (numActiveConsumers_ < numConsumers_) {
527         signal = true;
528       }
529       queue_.emplace_back(std::move(message), RequestContext::saveContext());
530     }
531     if (signal) {
532       signalEvent();
533     }
534     return true;
535   }
536
537   bool putMessageImpl(
538     const MessageT& message, size_t maxSize, bool throws=true) {
539     checkPid();
540     bool signal = false;
541     {
542       folly::SpinLockGuard g(spinlock_);
543       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
544         return false;
545       }
546       if (numActiveConsumers_ < numConsumers_) {
547         signal = true;
548       }
549       queue_.emplace_back(message, RequestContext::saveContext());
550     }
551     if (signal) {
552       signalEvent();
553     }
554     return true;
555   }
556
557   template<typename InputIteratorT>
558   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
559                        std::input_iterator_tag) {
560     checkPid();
561     bool signal = false;
562     size_t numAdded = 0;
563     {
564       folly::SpinLockGuard g(spinlock_);
565       checkDraining();
566       while (first != last) {
567         queue_.emplace_back(*first, RequestContext::saveContext());
568         ++first;
569         ++numAdded;
570       }
571       if (numActiveConsumers_ < numConsumers_) {
572         signal = true;
573       }
574     }
575     if (signal) {
576       signalEvent();
577     }
578   }
579
580   mutable folly::SpinLock spinlock_;
581   int eventfd_;
582   int pipeFds_[2]; // to fallback to on older/non-linux systems
583   uint32_t advisoryMaxQueueSize_;
584   pid_t pid_;
585   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
586   int numConsumers_{0};
587   std::atomic<int> numActiveConsumers_{0};
588   bool draining_{false};
589 };
590
591 template<typename MessageT>
592 void NotificationQueue<MessageT>::Consumer::destroy() {
593   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
594   // will be non-nullptr.  Mark the value that it points to, so that
595   // handlerReady() will know the callback is destroyed, and that it cannot
596   // access any member variables anymore.
597   if (destroyedFlagPtr_) {
598     *destroyedFlagPtr_ = true;
599   }
600   stopConsuming();
601   DelayedDestruction::destroy();
602 }
603
604 template<typename MessageT>
605 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
606     noexcept {
607   consumeMessages(false);
608 }
609
610 template<typename MessageT>
611 void NotificationQueue<MessageT>::Consumer::consumeMessages(
612     bool isDrain, size_t* numConsumed) noexcept {
613   DestructorGuard dg(this);
614   uint32_t numProcessed = 0;
615   bool firstRun = true;
616   setActive(true);
617   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
618   SCOPE_EXIT {
619     if (numConsumed != nullptr) {
620       *numConsumed = numProcessed;
621     }
622   };
623   while (true) {
624     // Try to decrement the eventfd.
625     //
626     // The eventfd is only used to wake up the consumer - there may or
627     // may not actually be an event available (another consumer may
628     // have read it).  We don't really care, we only care about
629     // emptying the queue.
630     if (!isDrain && firstRun) {
631       queue_->tryConsumeEvent();
632       firstRun = false;
633     }
634
635     // Now pop the message off of the queue.
636     //
637     // We have to manually acquire and release the spinlock here, rather than
638     // using SpinLockHolder since the MessageT has to be constructed while
639     // holding the spinlock and available after we release it.  SpinLockHolder
640     // unfortunately doesn't provide a release() method.  (We can't construct
641     // MessageT first since we have no guarantee that MessageT has a default
642     // constructor.
643     queue_->spinlock_.lock();
644     bool locked = true;
645
646     try {
647       if (UNLIKELY(queue_->queue_.empty())) {
648         // If there is no message, we've reached the end of the queue, return.
649         setActive(false);
650         queue_->spinlock_.unlock();
651         return;
652       }
653
654       // Pull a message off the queue.
655       auto& data = queue_->queue_.front();
656
657       MessageT msg(std::move(data.first));
658       auto old_ctx =
659         RequestContext::setContext(data.second);
660       queue_->queue_.pop_front();
661
662       // Check to see if the queue is empty now.
663       // We use this as an optimization to see if we should bother trying to
664       // loop again and read another message after invoking this callback.
665       bool wasEmpty = queue_->queue_.empty();
666       if (wasEmpty) {
667         setActive(false);
668       }
669
670       // Now unlock the spinlock before we invoke the callback.
671       queue_->spinlock_.unlock();
672       locked = false;
673
674       // Call the callback
675       bool callbackDestroyed = false;
676       CHECK(destroyedFlagPtr_ == nullptr);
677       destroyedFlagPtr_ = &callbackDestroyed;
678       messageAvailable(std::move(msg));
679       destroyedFlagPtr_ = nullptr;
680
681       RequestContext::setContext(old_ctx);
682
683       // If the callback was destroyed before it returned, we are done
684       if (callbackDestroyed) {
685         return;
686       }
687
688       // If the callback is no longer installed, we are done.
689       if (queue_ == nullptr) {
690         return;
691       }
692
693       // If we have hit maxReadAtOnce_, we are done.
694       ++numProcessed;
695       if (!isDrain && maxReadAtOnce_ > 0 &&
696           numProcessed >= maxReadAtOnce_) {
697         queue_->signalEvent(1);
698         return;
699       }
700
701       // If the queue was empty before we invoked the callback, it's probable
702       // that it is still empty now.  Just go ahead and return, rather than
703       // looping again and trying to re-read from the eventfd.  (If a new
704       // message had in fact arrived while we were invoking the callback, we
705       // will simply be woken up the next time around the event loop and will
706       // process the message then.)
707       if (wasEmpty) {
708         return;
709       }
710     } catch (const std::exception& ex) {
711       // This catch block is really just to handle the case where the MessageT
712       // constructor throws.  The messageAvailable() callback itself is
713       // declared as noexcept and should never throw.
714       //
715       // If the MessageT constructor does throw we try to handle it as best as
716       // we can, but we can't work miracles.  We will just ignore the error for
717       // now and return.  The next time around the event loop we will end up
718       // trying to read the message again.  If MessageT continues to throw we
719       // will never make forward progress and will keep trying each time around
720       // the event loop.
721       if (locked) {
722         // Unlock the spinlock.
723         queue_->spinlock_.unlock();
724
725         // Push a notification back on the eventfd since we didn't actually
726         // read the message off of the queue.
727         if (!isDrain) {
728           queue_->signalEvent(1);
729         }
730       }
731
732       return;
733     }
734   }
735 }
736
737 template<typename MessageT>
738 void NotificationQueue<MessageT>::Consumer::init(
739     EventBase* eventBase,
740     NotificationQueue* queue) {
741   assert(eventBase->isInEventBaseThread());
742   assert(queue_ == nullptr);
743   assert(!isHandlerRegistered());
744   queue->checkPid();
745
746   base_ = eventBase;
747
748   queue_ = queue;
749
750   {
751     folly::SpinLockGuard g(queue_->spinlock_);
752     queue_->numConsumers_++;
753   }
754   queue_->signalEvent();
755
756   if (queue_->eventfd_ >= 0) {
757     initHandler(eventBase, queue_->eventfd_);
758   } else {
759     initHandler(eventBase, queue_->pipeFds_[0]);
760   }
761 }
762
763 template<typename MessageT>
764 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
765   if (queue_ == nullptr) {
766     assert(!isHandlerRegistered());
767     return;
768   }
769
770   {
771     folly::SpinLockGuard g(queue_->spinlock_);
772     queue_->numConsumers_--;
773     setActive(false);
774   }
775
776   assert(isHandlerRegistered());
777   unregisterHandler();
778   detachEventBase();
779   queue_ = nullptr;
780 }
781
782 template<typename MessageT>
783 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
784     size_t* numConsumed) noexcept {
785   DestructorGuard dg(this);
786   {
787     folly::SpinLockGuard g(queue_->spinlock_);
788     if (queue_->draining_) {
789       return false;
790     }
791     queue_->draining_ = true;
792   }
793   consumeMessages(true, numConsumed);
794   {
795     folly::SpinLockGuard g(queue_->spinlock_);
796     queue_->draining_ = false;
797   }
798   return true;
799 }
800
801 } // folly