9eee132bc7b6b2063d1e2479eed59dd54b251b1f
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22
23 #include <algorithm>
24 #include <deque>
25 #include <iterator>
26 #include <memory>
27 #include <stdexcept>
28 #include <utility>
29
30 #include <folly/io/async/EventBase.h>
31 #include <folly/io/async/EventHandler.h>
32 #include <folly/io/async/DelayedDestruction.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/Likely.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/SpinLock.h>
37
38 #include <glog/logging.h>
39
40 #if __linux__ && !__ANDROID__
41 #define FOLLY_HAVE_EVENTFD
42 #include <folly/io/async/EventFDWrapper.h>
43 #endif
44
45 namespace folly {
46
47 /**
48  * A producer-consumer queue for passing messages between EventBase threads.
49  *
50  * Messages can be added to the queue from any thread.  Multiple consumers may
51  * listen to the queue from multiple EventBase threads.
52  *
53  * A NotificationQueue may not be destroyed while there are still consumers
54  * registered to receive events from the queue.  It is the user's
55  * responsibility to ensure that all consumers are unregistered before the
56  * queue is destroyed.
57  *
58  * MessageT should be MoveConstructible (i.e., must support either a move
59  * constructor or a copy constructor, or both).  Ideally it's move constructor
60  * (or copy constructor if no move constructor is provided) should never throw
61  * exceptions.  If the constructor may throw, the consumers could end up
62  * spinning trying to move a message off the queue and failing, and then
63  * retrying.
64  */
65 template<typename MessageT>
66 class NotificationQueue {
67  public:
68   /**
69    * A callback interface for consuming messages from the queue as they arrive.
70    */
71   class Consumer : public DelayedDestruction, private EventHandler {
72    public:
73     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
74
75     Consumer()
76       : queue_(nullptr),
77         destroyedFlagPtr_(nullptr),
78         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
79
80     // create a consumer in-place, without the need to build new class
81     template <typename TCallback>
82     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
83         TCallback&& callback);
84
85     /**
86      * messageAvailable() will be invoked whenever a new
87      * message is available from the pipe.
88      */
89     virtual void messageAvailable(MessageT&& message) = 0;
90
91     /**
92      * Begin consuming messages from the specified queue.
93      *
94      * messageAvailable() will be called whenever a message is available.  This
95      * consumer will continue to consume messages until stopConsuming() is
96      * called.
97      *
98      * A Consumer may only consume messages from a single NotificationQueue at
99      * a time.  startConsuming() should not be called if this consumer is
100      * already consuming.
101      */
102     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
103       init(eventBase, queue);
104       registerHandler(READ | PERSIST);
105     }
106
107     /**
108      * Same as above but registers this event handler as internal so that it
109      * doesn't count towards the pending reader count for the IOLoop.
110      */
111     void startConsumingInternal(
112         EventBase* eventBase, NotificationQueue* queue) {
113       init(eventBase, queue);
114       registerInternalHandler(READ | PERSIST);
115     }
116
117     /**
118      * Stop consuming messages.
119      *
120      * startConsuming() may be called again to resume consumption of messages
121      * at a later point in time.
122      */
123     void stopConsuming();
124
125     /**
126      * Consume messages off the queue until it is empty. No messages may be
127      * added to the queue while it is draining, so that the process is bounded.
128      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
129      * and tryPutMessageNoThrow will return false.
130      *
131      * @returns true if the queue was drained, false otherwise. In practice,
132      * this will only fail if someone else is already draining the queue.
133      */
134     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
135
136     /**
137      * Get the NotificationQueue that this consumer is currently consuming
138      * messages from.  Returns nullptr if the consumer is not currently
139      * consuming events from any queue.
140      */
141     NotificationQueue* getCurrentQueue() const {
142       return queue_;
143     }
144
145     /**
146      * Set a limit on how many messages this consumer will read each iteration
147      * around the event loop.
148      *
149      * This helps rate-limit how much work the Consumer will do each event loop
150      * iteration, to prevent it from starving other event handlers.
151      *
152      * A limit of 0 means no limit will be enforced.  If unset, the limit
153      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
154      */
155     void setMaxReadAtOnce(uint32_t maxAtOnce) {
156       maxReadAtOnce_ = maxAtOnce;
157     }
158     uint32_t getMaxReadAtOnce() const {
159       return maxReadAtOnce_;
160     }
161
162     EventBase* getEventBase() {
163       return base_;
164     }
165
166     void handlerReady(uint16_t events) noexcept override;
167
168    protected:
169
170     void destroy() override;
171
172     virtual ~Consumer() {}
173
174    private:
175     /**
176      * Consume messages off the the queue until
177      *   - the queue is empty (1), or
178      *   - until the consumer is destroyed, or
179      *   - until the consumer is uninstalled, or
180      *   - an exception is thrown in the course of dequeueing, or
181      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
182      *
183      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
184      */
185     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
186
187     void setActive(bool active, bool shouldLock = false) {
188       if (!queue_) {
189         active_ = active;
190         return;
191       }
192       if (shouldLock) {
193         queue_->spinlock_.lock();
194       }
195       if (!active_ && active) {
196         ++queue_->numActiveConsumers_;
197       } else if (active_ && !active) {
198         --queue_->numActiveConsumers_;
199       }
200       active_ = active;
201       if (shouldLock) {
202         queue_->spinlock_.unlock();
203       }
204     }
205     void init(EventBase* eventBase, NotificationQueue* queue);
206
207     NotificationQueue* queue_;
208     bool* destroyedFlagPtr_;
209     uint32_t maxReadAtOnce_;
210     EventBase* base_;
211     bool active_{false};
212   };
213
214   enum class FdType {
215     PIPE,
216 #ifdef FOLLY_HAVE_EVENTFD
217     EVENTFD,
218 #endif
219   };
220
221   /**
222    * Create a new NotificationQueue.
223    *
224    * If the maxSize parameter is specified, this sets the maximum queue size
225    * that will be enforced by tryPutMessage().  (This size is advisory, and may
226    * be exceeded if producers explicitly use putMessage() instead of
227    * tryPutMessage().)
228    *
229    * The fdType parameter determines the type of file descriptor used
230    * internally to signal message availability.  The default (eventfd) is
231    * preferable for performance and because it won't fail when the queue gets
232    * too long.  It is not available on on older and non-linux kernels, however.
233    * In this case the code will fall back to using a pipe, the parameter is
234    * mostly for testing purposes.
235    */
236   explicit NotificationQueue(uint32_t maxSize = 0,
237 #ifdef FOLLY_HAVE_EVENTFD
238                              FdType fdType = FdType::EVENTFD)
239 #else
240                              FdType fdType = FdType::PIPE)
241 #endif
242     : eventfd_(-1),
243       pipeFds_{-1, -1},
244       advisoryMaxQueueSize_(maxSize),
245       pid_(getpid()),
246       queue_() {
247
248     RequestContext::saveContext();
249
250 #ifdef FOLLY_HAVE_EVENTFD
251     if (fdType == FdType::EVENTFD) {
252       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
253       if (eventfd_ == -1) {
254         if (errno == ENOSYS || errno == EINVAL) {
255           // eventfd not availalble
256           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
257                      << errno << ", falling back to pipe mode (is your kernel "
258                      << "> 2.6.30?)";
259           fdType = FdType::PIPE;
260         } else {
261           // some other error
262           folly::throwSystemError("Failed to create eventfd for "
263                                   "NotificationQueue", errno);
264         }
265       }
266     }
267 #endif
268     if (fdType == FdType::PIPE) {
269       if (pipe(pipeFds_)) {
270         folly::throwSystemError("Failed to create pipe for NotificationQueue",
271                                 errno);
272       }
273       try {
274         // put both ends of the pipe into non-blocking mode
275         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
276           folly::throwSystemError("failed to put NotificationQueue pipe read "
277                                   "endpoint into non-blocking mode", errno);
278         }
279         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
280           folly::throwSystemError("failed to put NotificationQueue pipe write "
281                                   "endpoint into non-blocking mode", errno);
282         }
283       } catch (...) {
284         ::close(pipeFds_[0]);
285         ::close(pipeFds_[1]);
286         throw;
287       }
288     }
289   }
290
291   ~NotificationQueue() {
292     if (eventfd_ >= 0) {
293       ::close(eventfd_);
294       eventfd_ = -1;
295     }
296     if (pipeFds_[0] >= 0) {
297       ::close(pipeFds_[0]);
298       pipeFds_[0] = -1;
299     }
300     if (pipeFds_[1] >= 0) {
301       ::close(pipeFds_[1]);
302       pipeFds_[1] = -1;
303     }
304   }
305
306   /**
307    * Set the advisory maximum queue size.
308    *
309    * This maximum queue size affects calls to tryPutMessage().  Message
310    * producers can still use the putMessage() call to unconditionally put a
311    * message on the queue, ignoring the configured maximum queue size.  This
312    * can cause the queue size to exceed the configured maximum.
313    */
314   void setMaxQueueSize(uint32_t max) {
315     advisoryMaxQueueSize_ = max;
316   }
317
318   /**
319    * Attempt to put a message on the queue if the queue is not already full.
320    *
321    * If the queue is full, a std::overflow_error will be thrown.  The
322    * setMaxQueueSize() function controls the maximum queue size.
323    *
324    * If the queue is currently draining, an std::runtime_error will be thrown.
325    *
326    * This method may contend briefly on a spinlock if many threads are
327    * concurrently accessing the queue, but for all intents and purposes it will
328    * immediately place the message on the queue and return.
329    *
330    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
331    * may throw any other exception thrown by the MessageT move/copy
332    * constructor.
333    */
334   void tryPutMessage(MessageT&& message) {
335     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
336   }
337   void tryPutMessage(const MessageT& message) {
338     putMessageImpl(message, advisoryMaxQueueSize_);
339   }
340
341   /**
342    * No-throw versions of the above.  Instead returns true on success, false on
343    * failure.
344    *
345    * Only std::overflow_error (the common exception case) and std::runtime_error
346    * (which indicates that the queue is being drained) are prevented from being
347    * thrown. User code must still catch std::bad_alloc errors.
348    */
349   bool tryPutMessageNoThrow(MessageT&& message) {
350     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
351   }
352   bool tryPutMessageNoThrow(const MessageT& message) {
353     return putMessageImpl(message, advisoryMaxQueueSize_, false);
354   }
355
356   /**
357    * Unconditionally put a message on the queue.
358    *
359    * This method is like tryPutMessage(), but ignores the maximum queue size
360    * and always puts the message on the queue, even if the maximum queue size
361    * would be exceeded.
362    *
363    * putMessage() may throw
364    *   - std::bad_alloc if memory allocation fails, and may
365    *   - std::runtime_error if the queue is currently draining
366    *   - any other exception thrown by the MessageT move/copy constructor.
367    */
368   void putMessage(MessageT&& message) {
369     putMessageImpl(std::move(message), 0);
370   }
371   void putMessage(const MessageT& message) {
372     putMessageImpl(message, 0);
373   }
374
375   /**
376    * Put several messages on the queue.
377    */
378   template<typename InputIteratorT>
379   void putMessages(InputIteratorT first, InputIteratorT last) {
380     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
381       IterCategory;
382     putMessagesImpl(first, last, IterCategory());
383   }
384
385   /**
386    * Try to immediately pull a message off of the queue, without blocking.
387    *
388    * If a message is immediately available, the result parameter will be
389    * updated to contain the message contents and true will be returned.
390    *
391    * If no message is available, false will be returned and result will be left
392    * unmodified.
393    */
394   bool tryConsume(MessageT& result) {
395     checkPid();
396
397     try {
398
399       folly::SpinLockGuard g(spinlock_);
400
401       if (UNLIKELY(queue_.empty())) {
402         return false;
403       }
404
405       auto data = std::move(queue_.front());
406       result = data.first;
407       RequestContext::setContext(data.second);
408
409       queue_.pop_front();
410     } catch (...) {
411       // Handle an exception if the assignment operator happens to throw.
412       // We consumed an event but weren't able to pop the message off the
413       // queue.  Signal the event again since the message is still in the
414       // queue.
415       signalEvent(1);
416       throw;
417     }
418
419     return true;
420   }
421
422   size_t size() {
423     folly::SpinLockGuard g(spinlock_);
424     return queue_.size();
425   }
426
427   /**
428    * Check that the NotificationQueue is being used from the correct process.
429    *
430    * If you create a NotificationQueue in one process, then fork, and try to
431    * send messages to the queue from the child process, you're going to have a
432    * bad time.  Unfortunately users have (accidentally) run into this.
433    *
434    * Because we use an eventfd/pipe, the child process can actually signal the
435    * parent process that an event is ready.  However, it can't put anything on
436    * the parent's queue, so the parent wakes up and finds an empty queue.  This
437    * check ensures that we catch the problem in the misbehaving child process
438    * code, and crash before signalling the parent process.
439    */
440   void checkPid() const {
441     CHECK_EQ(pid_, getpid());
442   }
443
444  private:
445   // Forbidden copy constructor and assignment operator
446   NotificationQueue(NotificationQueue const &) = delete;
447   NotificationQueue& operator=(NotificationQueue const &) = delete;
448
449   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
450     DCHECK(0 == spinlock_.trylock());
451     if (maxSize > 0 && queue_.size() >= maxSize) {
452       if (throws) {
453         throw std::overflow_error("unable to add message to NotificationQueue: "
454                                   "queue is full");
455       }
456       return false;
457     }
458     return true;
459   }
460
461   inline bool checkDraining(bool throws=true) {
462     if (UNLIKELY(draining_ && throws)) {
463       throw std::runtime_error("queue is draining, cannot add message");
464     }
465     return draining_;
466   }
467
468   inline void signalEvent(size_t numAdded = 1) const {
469     static const uint8_t kPipeMessage[] = {
470       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
471     };
472
473     ssize_t bytes_written = 0;
474     ssize_t bytes_expected = 0;
475     if (eventfd_ >= 0) {
476       // eventfd(2) dictates that we must write a 64-bit integer
477       uint64_t numAdded64(numAdded);
478       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
479       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
480     } else {
481       // pipe semantics, add one message for each numAdded
482       bytes_expected = numAdded;
483       do {
484         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
485         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
486         if (rc < 0) {
487           // TODO: if the pipe is full, write will fail with EAGAIN.
488           // See task #1044651 for how this could be handled
489           break;
490         }
491         numAdded -= rc;
492         bytes_written += rc;
493       } while (numAdded > 0);
494     }
495     if (bytes_written != bytes_expected) {
496       folly::throwSystemError("failed to signal NotificationQueue after "
497                               "write", errno);
498     }
499   }
500
501   bool tryConsumeEvent() {
502     uint64_t value = 0;
503     ssize_t rc = -1;
504     if (eventfd_ >= 0) {
505       rc = ::read(eventfd_, &value, sizeof(value));
506     } else {
507       uint8_t value8;
508       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
509       value = value8;
510     }
511     if (rc < 0) {
512       // EAGAIN should pretty much be the only error we can ever get.
513       // This means someone else already processed the only available message.
514       CHECK_EQ(errno, EAGAIN);
515       return false;
516     }
517     assert(value == 1);
518     return true;
519   }
520
521   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
522     checkPid();
523     bool signal = false;
524     {
525       folly::SpinLockGuard g(spinlock_);
526       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
527         return false;
528       }
529       // We only need to signal an event if not all consumers are
530       // awake.
531       if (numActiveConsumers_ < numConsumers_) {
532         signal = true;
533       }
534       queue_.emplace_back(std::move(message), RequestContext::saveContext());
535     }
536     if (signal) {
537       signalEvent();
538     }
539     return true;
540   }
541
542   bool putMessageImpl(
543     const MessageT& message, size_t maxSize, bool throws=true) {
544     checkPid();
545     bool signal = false;
546     {
547       folly::SpinLockGuard g(spinlock_);
548       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
549         return false;
550       }
551       if (numActiveConsumers_ < numConsumers_) {
552         signal = true;
553       }
554       queue_.emplace_back(message, RequestContext::saveContext());
555     }
556     if (signal) {
557       signalEvent();
558     }
559     return true;
560   }
561
562   template<typename InputIteratorT>
563   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
564                        std::input_iterator_tag) {
565     checkPid();
566     bool signal = false;
567     size_t numAdded = 0;
568     {
569       folly::SpinLockGuard g(spinlock_);
570       checkDraining();
571       while (first != last) {
572         queue_.emplace_back(*first, RequestContext::saveContext());
573         ++first;
574         ++numAdded;
575       }
576       if (numActiveConsumers_ < numConsumers_) {
577         signal = true;
578       }
579     }
580     if (signal) {
581       signalEvent();
582     }
583   }
584
585   mutable folly::SpinLock spinlock_;
586   int eventfd_;
587   int pipeFds_[2]; // to fallback to on older/non-linux systems
588   uint32_t advisoryMaxQueueSize_;
589   pid_t pid_;
590   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
591   int numConsumers_{0};
592   std::atomic<int> numActiveConsumers_{0};
593   bool draining_{false};
594 };
595
596 template<typename MessageT>
597 void NotificationQueue<MessageT>::Consumer::destroy() {
598   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
599   // will be non-nullptr.  Mark the value that it points to, so that
600   // handlerReady() will know the callback is destroyed, and that it cannot
601   // access any member variables anymore.
602   if (destroyedFlagPtr_) {
603     *destroyedFlagPtr_ = true;
604   }
605   stopConsuming();
606   DelayedDestruction::destroy();
607 }
608
609 template<typename MessageT>
610 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
611     noexcept {
612   consumeMessages(false);
613 }
614
615 template<typename MessageT>
616 void NotificationQueue<MessageT>::Consumer::consumeMessages(
617     bool isDrain, size_t* numConsumed) noexcept {
618   DestructorGuard dg(this);
619   uint32_t numProcessed = 0;
620   bool firstRun = true;
621   setActive(true);
622   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
623   SCOPE_EXIT {
624     if (numConsumed != nullptr) {
625       *numConsumed = numProcessed;
626     }
627   };
628   while (true) {
629     // Try to decrement the eventfd.
630     //
631     // The eventfd is only used to wake up the consumer - there may or
632     // may not actually be an event available (another consumer may
633     // have read it).  We don't really care, we only care about
634     // emptying the queue.
635     if (!isDrain && firstRun) {
636       queue_->tryConsumeEvent();
637       firstRun = false;
638     }
639
640     // Now pop the message off of the queue.
641     //
642     // We have to manually acquire and release the spinlock here, rather than
643     // using SpinLockHolder since the MessageT has to be constructed while
644     // holding the spinlock and available after we release it.  SpinLockHolder
645     // unfortunately doesn't provide a release() method.  (We can't construct
646     // MessageT first since we have no guarantee that MessageT has a default
647     // constructor.
648     queue_->spinlock_.lock();
649     bool locked = true;
650
651     try {
652       if (UNLIKELY(queue_->queue_.empty())) {
653         // If there is no message, we've reached the end of the queue, return.
654         setActive(false);
655         queue_->spinlock_.unlock();
656         return;
657       }
658
659       // Pull a message off the queue.
660       auto& data = queue_->queue_.front();
661
662       MessageT msg(std::move(data.first));
663       auto old_ctx =
664         RequestContext::setContext(data.second);
665       queue_->queue_.pop_front();
666
667       // Check to see if the queue is empty now.
668       // We use this as an optimization to see if we should bother trying to
669       // loop again and read another message after invoking this callback.
670       bool wasEmpty = queue_->queue_.empty();
671       if (wasEmpty) {
672         setActive(false);
673       }
674
675       // Now unlock the spinlock before we invoke the callback.
676       queue_->spinlock_.unlock();
677       locked = false;
678
679       // Call the callback
680       bool callbackDestroyed = false;
681       CHECK(destroyedFlagPtr_ == nullptr);
682       destroyedFlagPtr_ = &callbackDestroyed;
683       messageAvailable(std::move(msg));
684       destroyedFlagPtr_ = nullptr;
685
686       RequestContext::setContext(old_ctx);
687
688       // If the callback was destroyed before it returned, we are done
689       if (callbackDestroyed) {
690         return;
691       }
692
693       // If the callback is no longer installed, we are done.
694       if (queue_ == nullptr) {
695         return;
696       }
697
698       // If we have hit maxReadAtOnce_, we are done.
699       ++numProcessed;
700       if (!isDrain && maxReadAtOnce_ > 0 &&
701           numProcessed >= maxReadAtOnce_) {
702         queue_->signalEvent(1);
703         return;
704       }
705
706       // If the queue was empty before we invoked the callback, it's probable
707       // that it is still empty now.  Just go ahead and return, rather than
708       // looping again and trying to re-read from the eventfd.  (If a new
709       // message had in fact arrived while we were invoking the callback, we
710       // will simply be woken up the next time around the event loop and will
711       // process the message then.)
712       if (wasEmpty) {
713         return;
714       }
715     } catch (const std::exception& ex) {
716       // This catch block is really just to handle the case where the MessageT
717       // constructor throws.  The messageAvailable() callback itself is
718       // declared as noexcept and should never throw.
719       //
720       // If the MessageT constructor does throw we try to handle it as best as
721       // we can, but we can't work miracles.  We will just ignore the error for
722       // now and return.  The next time around the event loop we will end up
723       // trying to read the message again.  If MessageT continues to throw we
724       // will never make forward progress and will keep trying each time around
725       // the event loop.
726       if (locked) {
727         // Unlock the spinlock.
728         queue_->spinlock_.unlock();
729
730         // Push a notification back on the eventfd since we didn't actually
731         // read the message off of the queue.
732         if (!isDrain) {
733           queue_->signalEvent(1);
734         }
735       }
736
737       return;
738     }
739   }
740 }
741
742 template<typename MessageT>
743 void NotificationQueue<MessageT>::Consumer::init(
744     EventBase* eventBase,
745     NotificationQueue* queue) {
746   assert(eventBase->isInEventBaseThread());
747   assert(queue_ == nullptr);
748   assert(!isHandlerRegistered());
749   queue->checkPid();
750
751   base_ = eventBase;
752
753   queue_ = queue;
754
755   {
756     folly::SpinLockGuard g(queue_->spinlock_);
757     queue_->numConsumers_++;
758   }
759   queue_->signalEvent();
760
761   if (queue_->eventfd_ >= 0) {
762     initHandler(eventBase, queue_->eventfd_);
763   } else {
764     initHandler(eventBase, queue_->pipeFds_[0]);
765   }
766 }
767
768 template<typename MessageT>
769 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
770   if (queue_ == nullptr) {
771     assert(!isHandlerRegistered());
772     return;
773   }
774
775   {
776     folly::SpinLockGuard g(queue_->spinlock_);
777     queue_->numConsumers_--;
778     setActive(false);
779   }
780
781   assert(isHandlerRegistered());
782   unregisterHandler();
783   detachEventBase();
784   queue_ = nullptr;
785 }
786
787 template<typename MessageT>
788 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
789     size_t* numConsumed) noexcept {
790   DestructorGuard dg(this);
791   {
792     folly::SpinLockGuard g(queue_->spinlock_);
793     if (queue_->draining_) {
794       return false;
795     }
796     queue_->draining_ = true;
797   }
798   consumeMessages(true, numConsumed);
799   {
800     folly::SpinLockGuard g(queue_->spinlock_);
801     queue_->draining_ = false;
802   }
803   return true;
804 }
805
806 /**
807  * Creates a NotificationQueue::Consumer wrapping a function object
808  * Modeled after AsyncTimeout::make
809  *
810  */
811
812 namespace detail {
813
814 template <typename MessageT, typename TCallback>
815 struct notification_queue_consumer_wrapper
816     : public NotificationQueue<MessageT>::Consumer {
817
818   template <typename UCallback>
819   explicit notification_queue_consumer_wrapper(UCallback&& callback)
820       : callback_(std::forward<UCallback>(callback)) {}
821
822   // we are being stricter here and requiring noexcept for callback
823   void messageAvailable(MessageT&& message) override {
824     static_assert(
825       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
826       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
827     );
828
829     callback_(std::forward<MessageT>(message));
830   }
831
832  private:
833   TCallback callback_;
834 };
835
836 } // namespace detail
837
838 template <typename MessageT>
839 template <typename TCallback>
840 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
841                 DelayedDestruction::Destructor>
842 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
843   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
844                          DelayedDestruction::Destructor>(
845       new detail::notification_queue_consumer_wrapper<
846           MessageT,
847           typename std::decay<TCallback>::type>(
848           std::forward<TCallback>(callback)));
849 }
850
851 } // folly