change CHECK to LOG(ERROR) when pipe read error on NotificationQueue
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22
23 #include <algorithm>
24 #include <deque>
25 #include <iterator>
26 #include <memory>
27 #include <stdexcept>
28 #include <utility>
29
30 #include <folly/io/async/EventBase.h>
31 #include <folly/io/async/EventHandler.h>
32 #include <folly/io/async/DelayedDestruction.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/Likely.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/SpinLock.h>
37
38 #include <glog/logging.h>
39
40 #if __linux__ && !__ANDROID__
41 #define FOLLY_HAVE_EVENTFD
42 #include <folly/io/async/EventFDWrapper.h>
43 #endif
44
45 namespace folly {
46
47 /**
48  * A producer-consumer queue for passing messages between EventBase threads.
49  *
50  * Messages can be added to the queue from any thread.  Multiple consumers may
51  * listen to the queue from multiple EventBase threads.
52  *
53  * A NotificationQueue may not be destroyed while there are still consumers
54  * registered to receive events from the queue.  It is the user's
55  * responsibility to ensure that all consumers are unregistered before the
56  * queue is destroyed.
57  *
58  * MessageT should be MoveConstructible (i.e., must support either a move
59  * constructor or a copy constructor, or both).  Ideally it's move constructor
60  * (or copy constructor if no move constructor is provided) should never throw
61  * exceptions.  If the constructor may throw, the consumers could end up
62  * spinning trying to move a message off the queue and failing, and then
63  * retrying.
64  */
65 template<typename MessageT>
66 class NotificationQueue {
67  public:
68   /**
69    * A callback interface for consuming messages from the queue as they arrive.
70    */
71   class Consumer : public DelayedDestruction, private EventHandler {
72    public:
73     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
74
75     Consumer()
76       : queue_(nullptr),
77         destroyedFlagPtr_(nullptr),
78         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
79
80     // create a consumer in-place, without the need to build new class
81     template <typename TCallback>
82     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
83         TCallback&& callback);
84
85     /**
86      * messageAvailable() will be invoked whenever a new
87      * message is available from the pipe.
88      */
89     virtual void messageAvailable(MessageT&& message) = 0;
90
91     /**
92      * Begin consuming messages from the specified queue.
93      *
94      * messageAvailable() will be called whenever a message is available.  This
95      * consumer will continue to consume messages until stopConsuming() is
96      * called.
97      *
98      * A Consumer may only consume messages from a single NotificationQueue at
99      * a time.  startConsuming() should not be called if this consumer is
100      * already consuming.
101      */
102     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
103       init(eventBase, queue);
104       registerHandler(READ | PERSIST);
105     }
106
107     /**
108      * Same as above but registers this event handler as internal so that it
109      * doesn't count towards the pending reader count for the IOLoop.
110      */
111     void startConsumingInternal(
112         EventBase* eventBase, NotificationQueue* queue) {
113       init(eventBase, queue);
114       registerInternalHandler(READ | PERSIST);
115     }
116
117     /**
118      * Stop consuming messages.
119      *
120      * startConsuming() may be called again to resume consumption of messages
121      * at a later point in time.
122      */
123     void stopConsuming();
124
125     /**
126      * Consume messages off the queue until it is empty. No messages may be
127      * added to the queue while it is draining, so that the process is bounded.
128      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
129      * and tryPutMessageNoThrow will return false.
130      *
131      * @returns true if the queue was drained, false otherwise. In practice,
132      * this will only fail if someone else is already draining the queue.
133      */
134     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
135
136     /**
137      * Get the NotificationQueue that this consumer is currently consuming
138      * messages from.  Returns nullptr if the consumer is not currently
139      * consuming events from any queue.
140      */
141     NotificationQueue* getCurrentQueue() const {
142       return queue_;
143     }
144
145     /**
146      * Set a limit on how many messages this consumer will read each iteration
147      * around the event loop.
148      *
149      * This helps rate-limit how much work the Consumer will do each event loop
150      * iteration, to prevent it from starving other event handlers.
151      *
152      * A limit of 0 means no limit will be enforced.  If unset, the limit
153      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
154      */
155     void setMaxReadAtOnce(uint32_t maxAtOnce) {
156       maxReadAtOnce_ = maxAtOnce;
157     }
158     uint32_t getMaxReadAtOnce() const {
159       return maxReadAtOnce_;
160     }
161
162     EventBase* getEventBase() {
163       return base_;
164     }
165
166     void handlerReady(uint16_t events) noexcept override;
167
168    protected:
169
170     void destroy() override;
171
172     virtual ~Consumer() {}
173
174    private:
175     /**
176      * Consume messages off the the queue until
177      *   - the queue is empty (1), or
178      *   - until the consumer is destroyed, or
179      *   - until the consumer is uninstalled, or
180      *   - an exception is thrown in the course of dequeueing, or
181      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
182      *
183      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
184      */
185     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
186
187     void setActive(bool active, bool shouldLock = false) {
188       if (!queue_) {
189         active_ = active;
190         return;
191       }
192       if (shouldLock) {
193         queue_->spinlock_.lock();
194       }
195       if (!active_ && active) {
196         ++queue_->numActiveConsumers_;
197       } else if (active_ && !active) {
198         --queue_->numActiveConsumers_;
199       }
200       active_ = active;
201       if (shouldLock) {
202         queue_->spinlock_.unlock();
203       }
204     }
205     void init(EventBase* eventBase, NotificationQueue* queue);
206
207     NotificationQueue* queue_;
208     bool* destroyedFlagPtr_;
209     uint32_t maxReadAtOnce_;
210     EventBase* base_;
211     bool active_{false};
212   };
213
214   enum class FdType {
215     PIPE,
216 #ifdef FOLLY_HAVE_EVENTFD
217     EVENTFD,
218 #endif
219   };
220
221   /**
222    * Create a new NotificationQueue.
223    *
224    * If the maxSize parameter is specified, this sets the maximum queue size
225    * that will be enforced by tryPutMessage().  (This size is advisory, and may
226    * be exceeded if producers explicitly use putMessage() instead of
227    * tryPutMessage().)
228    *
229    * The fdType parameter determines the type of file descriptor used
230    * internally to signal message availability.  The default (eventfd) is
231    * preferable for performance and because it won't fail when the queue gets
232    * too long.  It is not available on on older and non-linux kernels, however.
233    * In this case the code will fall back to using a pipe, the parameter is
234    * mostly for testing purposes.
235    */
236   explicit NotificationQueue(uint32_t maxSize = 0,
237 #ifdef FOLLY_HAVE_EVENTFD
238                              FdType fdType = FdType::EVENTFD)
239 #else
240                              FdType fdType = FdType::PIPE)
241 #endif
242     : eventfd_(-1),
243       pipeFds_{-1, -1},
244       advisoryMaxQueueSize_(maxSize),
245       pid_(getpid()),
246       queue_() {
247
248     RequestContext::saveContext();
249
250 #ifdef FOLLY_HAVE_EVENTFD
251     if (fdType == FdType::EVENTFD) {
252       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
253       if (eventfd_ == -1) {
254         if (errno == ENOSYS || errno == EINVAL) {
255           // eventfd not availalble
256           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
257                      << errno << ", falling back to pipe mode (is your kernel "
258                      << "> 2.6.30?)";
259           fdType = FdType::PIPE;
260         } else {
261           // some other error
262           folly::throwSystemError("Failed to create eventfd for "
263                                   "NotificationQueue", errno);
264         }
265       }
266     }
267 #endif
268     if (fdType == FdType::PIPE) {
269       if (pipe(pipeFds_)) {
270         folly::throwSystemError("Failed to create pipe for NotificationQueue",
271                                 errno);
272       }
273       try {
274         // put both ends of the pipe into non-blocking mode
275         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
276           folly::throwSystemError("failed to put NotificationQueue pipe read "
277                                   "endpoint into non-blocking mode", errno);
278         }
279         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
280           folly::throwSystemError("failed to put NotificationQueue pipe write "
281                                   "endpoint into non-blocking mode", errno);
282         }
283       } catch (...) {
284         ::close(pipeFds_[0]);
285         ::close(pipeFds_[1]);
286         throw;
287       }
288     }
289   }
290
291   ~NotificationQueue() {
292     if (eventfd_ >= 0) {
293       ::close(eventfd_);
294       eventfd_ = -1;
295     }
296     if (pipeFds_[0] >= 0) {
297       ::close(pipeFds_[0]);
298       pipeFds_[0] = -1;
299     }
300     if (pipeFds_[1] >= 0) {
301       ::close(pipeFds_[1]);
302       pipeFds_[1] = -1;
303     }
304   }
305
306   /**
307    * Set the advisory maximum queue size.
308    *
309    * This maximum queue size affects calls to tryPutMessage().  Message
310    * producers can still use the putMessage() call to unconditionally put a
311    * message on the queue, ignoring the configured maximum queue size.  This
312    * can cause the queue size to exceed the configured maximum.
313    */
314   void setMaxQueueSize(uint32_t max) {
315     advisoryMaxQueueSize_ = max;
316   }
317
318   /**
319    * Attempt to put a message on the queue if the queue is not already full.
320    *
321    * If the queue is full, a std::overflow_error will be thrown.  The
322    * setMaxQueueSize() function controls the maximum queue size.
323    *
324    * If the queue is currently draining, an std::runtime_error will be thrown.
325    *
326    * This method may contend briefly on a spinlock if many threads are
327    * concurrently accessing the queue, but for all intents and purposes it will
328    * immediately place the message on the queue and return.
329    *
330    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
331    * may throw any other exception thrown by the MessageT move/copy
332    * constructor.
333    */
334   void tryPutMessage(MessageT&& message) {
335     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
336   }
337   void tryPutMessage(const MessageT& message) {
338     putMessageImpl(message, advisoryMaxQueueSize_);
339   }
340
341   /**
342    * No-throw versions of the above.  Instead returns true on success, false on
343    * failure.
344    *
345    * Only std::overflow_error (the common exception case) and std::runtime_error
346    * (which indicates that the queue is being drained) are prevented from being
347    * thrown. User code must still catch std::bad_alloc errors.
348    */
349   bool tryPutMessageNoThrow(MessageT&& message) {
350     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
351   }
352   bool tryPutMessageNoThrow(const MessageT& message) {
353     return putMessageImpl(message, advisoryMaxQueueSize_, false);
354   }
355
356   /**
357    * Unconditionally put a message on the queue.
358    *
359    * This method is like tryPutMessage(), but ignores the maximum queue size
360    * and always puts the message on the queue, even if the maximum queue size
361    * would be exceeded.
362    *
363    * putMessage() may throw
364    *   - std::bad_alloc if memory allocation fails, and may
365    *   - std::runtime_error if the queue is currently draining
366    *   - any other exception thrown by the MessageT move/copy constructor.
367    */
368   void putMessage(MessageT&& message) {
369     putMessageImpl(std::move(message), 0);
370   }
371   void putMessage(const MessageT& message) {
372     putMessageImpl(message, 0);
373   }
374
375   /**
376    * Put several messages on the queue.
377    */
378   template<typename InputIteratorT>
379   void putMessages(InputIteratorT first, InputIteratorT last) {
380     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
381       IterCategory;
382     putMessagesImpl(first, last, IterCategory());
383   }
384
385   /**
386    * Try to immediately pull a message off of the queue, without blocking.
387    *
388    * If a message is immediately available, the result parameter will be
389    * updated to contain the message contents and true will be returned.
390    *
391    * If no message is available, false will be returned and result will be left
392    * unmodified.
393    */
394   bool tryConsume(MessageT& result) {
395     checkPid();
396
397     try {
398
399       folly::SpinLockGuard g(spinlock_);
400
401       if (UNLIKELY(queue_.empty())) {
402         return false;
403       }
404
405       auto data = std::move(queue_.front());
406       result = data.first;
407       RequestContext::setContext(data.second);
408
409       queue_.pop_front();
410     } catch (...) {
411       // Handle an exception if the assignment operator happens to throw.
412       // We consumed an event but weren't able to pop the message off the
413       // queue.  Signal the event again since the message is still in the
414       // queue.
415       signalEvent(1);
416       throw;
417     }
418
419     return true;
420   }
421
422   size_t size() {
423     folly::SpinLockGuard g(spinlock_);
424     return queue_.size();
425   }
426
427   /**
428    * Check that the NotificationQueue is being used from the correct process.
429    *
430    * If you create a NotificationQueue in one process, then fork, and try to
431    * send messages to the queue from the child process, you're going to have a
432    * bad time.  Unfortunately users have (accidentally) run into this.
433    *
434    * Because we use an eventfd/pipe, the child process can actually signal the
435    * parent process that an event is ready.  However, it can't put anything on
436    * the parent's queue, so the parent wakes up and finds an empty queue.  This
437    * check ensures that we catch the problem in the misbehaving child process
438    * code, and crash before signalling the parent process.
439    */
440   void checkPid() const {
441     CHECK_EQ(pid_, getpid());
442   }
443
444  private:
445   // Forbidden copy constructor and assignment operator
446   NotificationQueue(NotificationQueue const &) = delete;
447   NotificationQueue& operator=(NotificationQueue const &) = delete;
448
449   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
450     DCHECK(0 == spinlock_.trylock());
451     if (maxSize > 0 && queue_.size() >= maxSize) {
452       if (throws) {
453         throw std::overflow_error("unable to add message to NotificationQueue: "
454                                   "queue is full");
455       }
456       return false;
457     }
458     return true;
459   }
460
461   inline bool checkDraining(bool throws=true) {
462     if (UNLIKELY(draining_ && throws)) {
463       throw std::runtime_error("queue is draining, cannot add message");
464     }
465     return draining_;
466   }
467
468   inline void signalEvent(size_t numAdded = 1) const {
469     static const uint8_t kPipeMessage[] = {
470       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
471     };
472
473     ssize_t bytes_written = 0;
474     ssize_t bytes_expected = 0;
475     if (eventfd_ >= 0) {
476       // eventfd(2) dictates that we must write a 64-bit integer
477       uint64_t numAdded64(numAdded);
478       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
479       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
480     } else {
481       // pipe semantics, add one message for each numAdded
482       bytes_expected = numAdded;
483       do {
484         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
485         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
486         if (rc < 0) {
487           // TODO: if the pipe is full, write will fail with EAGAIN.
488           // See task #1044651 for how this could be handled
489           break;
490         }
491         numAdded -= rc;
492         bytes_written += rc;
493       } while (numAdded > 0);
494     }
495     if (bytes_written != bytes_expected) {
496       folly::throwSystemError("failed to signal NotificationQueue after "
497                               "write", errno);
498     }
499   }
500
501   bool tryConsumeEvent() {
502     uint64_t value = 0;
503     ssize_t rc = -1;
504     if (eventfd_ >= 0) {
505       rc = ::read(eventfd_, &value, sizeof(value));
506     } else {
507       uint8_t value8;
508       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
509       value = value8;
510     }
511     if (rc < 0) {
512       // EAGAIN should pretty much be the only error we can ever get.
513       // This means someone else already processed the only available message.
514       if (rc != EAGAIN) {
515         LOG(ERROR) << "non-EAGAIN error returned on pipe read: " << errno;
516       }
517       return false;
518     }
519     assert(value == 1);
520     return true;
521   }
522
523   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
524     checkPid();
525     bool signal = false;
526     {
527       folly::SpinLockGuard g(spinlock_);
528       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
529         return false;
530       }
531       // We only need to signal an event if not all consumers are
532       // awake.
533       if (numActiveConsumers_ < numConsumers_) {
534         signal = true;
535       }
536       queue_.emplace_back(std::move(message), RequestContext::saveContext());
537     }
538     if (signal) {
539       signalEvent();
540     }
541     return true;
542   }
543
544   bool putMessageImpl(
545     const MessageT& message, size_t maxSize, bool throws=true) {
546     checkPid();
547     bool signal = false;
548     {
549       folly::SpinLockGuard g(spinlock_);
550       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
551         return false;
552       }
553       if (numActiveConsumers_ < numConsumers_) {
554         signal = true;
555       }
556       queue_.emplace_back(message, RequestContext::saveContext());
557     }
558     if (signal) {
559       signalEvent();
560     }
561     return true;
562   }
563
564   template<typename InputIteratorT>
565   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
566                        std::input_iterator_tag) {
567     checkPid();
568     bool signal = false;
569     size_t numAdded = 0;
570     {
571       folly::SpinLockGuard g(spinlock_);
572       checkDraining();
573       while (first != last) {
574         queue_.emplace_back(*first, RequestContext::saveContext());
575         ++first;
576         ++numAdded;
577       }
578       if (numActiveConsumers_ < numConsumers_) {
579         signal = true;
580       }
581     }
582     if (signal) {
583       signalEvent();
584     }
585   }
586
587   mutable folly::SpinLock spinlock_;
588   int eventfd_;
589   int pipeFds_[2]; // to fallback to on older/non-linux systems
590   uint32_t advisoryMaxQueueSize_;
591   pid_t pid_;
592   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
593   int numConsumers_{0};
594   std::atomic<int> numActiveConsumers_{0};
595   bool draining_{false};
596 };
597
598 template<typename MessageT>
599 void NotificationQueue<MessageT>::Consumer::destroy() {
600   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
601   // will be non-nullptr.  Mark the value that it points to, so that
602   // handlerReady() will know the callback is destroyed, and that it cannot
603   // access any member variables anymore.
604   if (destroyedFlagPtr_) {
605     *destroyedFlagPtr_ = true;
606   }
607   stopConsuming();
608   DelayedDestruction::destroy();
609 }
610
611 template<typename MessageT>
612 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
613     noexcept {
614   consumeMessages(false);
615 }
616
617 template<typename MessageT>
618 void NotificationQueue<MessageT>::Consumer::consumeMessages(
619     bool isDrain, size_t* numConsumed) noexcept {
620   DestructorGuard dg(this);
621   uint32_t numProcessed = 0;
622   bool firstRun = true;
623   setActive(true);
624   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
625   SCOPE_EXIT {
626     if (numConsumed != nullptr) {
627       *numConsumed = numProcessed;
628     }
629   };
630   while (true) {
631     // Try to decrement the eventfd.
632     //
633     // The eventfd is only used to wake up the consumer - there may or
634     // may not actually be an event available (another consumer may
635     // have read it).  We don't really care, we only care about
636     // emptying the queue.
637     if (!isDrain && firstRun) {
638       queue_->tryConsumeEvent();
639       firstRun = false;
640     }
641
642     // Now pop the message off of the queue.
643     //
644     // We have to manually acquire and release the spinlock here, rather than
645     // using SpinLockHolder since the MessageT has to be constructed while
646     // holding the spinlock and available after we release it.  SpinLockHolder
647     // unfortunately doesn't provide a release() method.  (We can't construct
648     // MessageT first since we have no guarantee that MessageT has a default
649     // constructor.
650     queue_->spinlock_.lock();
651     bool locked = true;
652
653     try {
654       if (UNLIKELY(queue_->queue_.empty())) {
655         // If there is no message, we've reached the end of the queue, return.
656         setActive(false);
657         queue_->spinlock_.unlock();
658         return;
659       }
660
661       // Pull a message off the queue.
662       auto& data = queue_->queue_.front();
663
664       MessageT msg(std::move(data.first));
665       auto old_ctx =
666         RequestContext::setContext(data.second);
667       queue_->queue_.pop_front();
668
669       // Check to see if the queue is empty now.
670       // We use this as an optimization to see if we should bother trying to
671       // loop again and read another message after invoking this callback.
672       bool wasEmpty = queue_->queue_.empty();
673       if (wasEmpty) {
674         setActive(false);
675       }
676
677       // Now unlock the spinlock before we invoke the callback.
678       queue_->spinlock_.unlock();
679       locked = false;
680
681       // Call the callback
682       bool callbackDestroyed = false;
683       CHECK(destroyedFlagPtr_ == nullptr);
684       destroyedFlagPtr_ = &callbackDestroyed;
685       messageAvailable(std::move(msg));
686       destroyedFlagPtr_ = nullptr;
687
688       RequestContext::setContext(old_ctx);
689
690       // If the callback was destroyed before it returned, we are done
691       if (callbackDestroyed) {
692         return;
693       }
694
695       // If the callback is no longer installed, we are done.
696       if (queue_ == nullptr) {
697         return;
698       }
699
700       // If we have hit maxReadAtOnce_, we are done.
701       ++numProcessed;
702       if (!isDrain && maxReadAtOnce_ > 0 &&
703           numProcessed >= maxReadAtOnce_) {
704         queue_->signalEvent(1);
705         return;
706       }
707
708       // If the queue was empty before we invoked the callback, it's probable
709       // that it is still empty now.  Just go ahead and return, rather than
710       // looping again and trying to re-read from the eventfd.  (If a new
711       // message had in fact arrived while we were invoking the callback, we
712       // will simply be woken up the next time around the event loop and will
713       // process the message then.)
714       if (wasEmpty) {
715         return;
716       }
717     } catch (const std::exception& ex) {
718       // This catch block is really just to handle the case where the MessageT
719       // constructor throws.  The messageAvailable() callback itself is
720       // declared as noexcept and should never throw.
721       //
722       // If the MessageT constructor does throw we try to handle it as best as
723       // we can, but we can't work miracles.  We will just ignore the error for
724       // now and return.  The next time around the event loop we will end up
725       // trying to read the message again.  If MessageT continues to throw we
726       // will never make forward progress and will keep trying each time around
727       // the event loop.
728       if (locked) {
729         // Unlock the spinlock.
730         queue_->spinlock_.unlock();
731
732         // Push a notification back on the eventfd since we didn't actually
733         // read the message off of the queue.
734         if (!isDrain) {
735           queue_->signalEvent(1);
736         }
737       }
738
739       return;
740     }
741   }
742 }
743
744 template<typename MessageT>
745 void NotificationQueue<MessageT>::Consumer::init(
746     EventBase* eventBase,
747     NotificationQueue* queue) {
748   assert(eventBase->isInEventBaseThread());
749   assert(queue_ == nullptr);
750   assert(!isHandlerRegistered());
751   queue->checkPid();
752
753   base_ = eventBase;
754
755   queue_ = queue;
756
757   {
758     folly::SpinLockGuard g(queue_->spinlock_);
759     queue_->numConsumers_++;
760   }
761   queue_->signalEvent();
762
763   if (queue_->eventfd_ >= 0) {
764     initHandler(eventBase, queue_->eventfd_);
765   } else {
766     initHandler(eventBase, queue_->pipeFds_[0]);
767   }
768 }
769
770 template<typename MessageT>
771 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
772   if (queue_ == nullptr) {
773     assert(!isHandlerRegistered());
774     return;
775   }
776
777   {
778     folly::SpinLockGuard g(queue_->spinlock_);
779     queue_->numConsumers_--;
780     setActive(false);
781   }
782
783   assert(isHandlerRegistered());
784   unregisterHandler();
785   detachEventBase();
786   queue_ = nullptr;
787 }
788
789 template<typename MessageT>
790 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
791     size_t* numConsumed) noexcept {
792   DestructorGuard dg(this);
793   {
794     folly::SpinLockGuard g(queue_->spinlock_);
795     if (queue_->draining_) {
796       return false;
797     }
798     queue_->draining_ = true;
799   }
800   consumeMessages(true, numConsumed);
801   {
802     folly::SpinLockGuard g(queue_->spinlock_);
803     queue_->draining_ = false;
804   }
805   return true;
806 }
807
808 /**
809  * Creates a NotificationQueue::Consumer wrapping a function object
810  * Modeled after AsyncTimeout::make
811  *
812  */
813
814 namespace detail {
815
816 template <typename MessageT, typename TCallback>
817 struct notification_queue_consumer_wrapper
818     : public NotificationQueue<MessageT>::Consumer {
819
820   template <typename UCallback>
821   explicit notification_queue_consumer_wrapper(UCallback&& callback)
822       : callback_(std::forward<UCallback>(callback)) {}
823
824   // we are being stricter here and requiring noexcept for callback
825   void messageAvailable(MessageT&& message) override {
826     static_assert(
827       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
828       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
829     );
830
831     callback_(std::forward<MessageT>(message));
832   }
833
834  private:
835   TCallback callback_;
836 };
837
838 } // namespace detail
839
840 template <typename MessageT>
841 template <typename TCallback>
842 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
843                 DelayedDestruction::Destructor>
844 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
845   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
846                          DelayedDestruction::Destructor>(
847       new detail::notification_queue_consumer_wrapper<
848           MessageT,
849           typename std::decay<TCallback>::type>(
850           std::forward<TCallback>(callback)));
851 }
852
853 } // folly