Updates the internal representation of signals in NotificationQueue
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <poll.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <deque>
26 #include <iterator>
27 #include <memory>
28 #include <stdexcept>
29 #include <utility>
30
31 #include <folly/FileUtil.h>
32 #include <folly/io/async/EventBase.h>
33 #include <folly/io/async/EventHandler.h>
34 #include <folly/io/async/DelayedDestruction.h>
35 #include <folly/io/async/Request.h>
36 #include <folly/Likely.h>
37 #include <folly/ScopeGuard.h>
38 #include <folly/SpinLock.h>
39
40 #include <glog/logging.h>
41
42 #if __linux__ && !__ANDROID__
43 #define FOLLY_HAVE_EVENTFD
44 #include <folly/io/async/EventFDWrapper.h>
45 #endif
46
47 namespace folly {
48
49 /**
50  * A producer-consumer queue for passing messages between EventBase threads.
51  *
52  * Messages can be added to the queue from any thread.  Multiple consumers may
53  * listen to the queue from multiple EventBase threads.
54  *
55  * A NotificationQueue may not be destroyed while there are still consumers
56  * registered to receive events from the queue.  It is the user's
57  * responsibility to ensure that all consumers are unregistered before the
58  * queue is destroyed.
59  *
60  * MessageT should be MoveConstructible (i.e., must support either a move
61  * constructor or a copy constructor, or both).  Ideally it's move constructor
62  * (or copy constructor if no move constructor is provided) should never throw
63  * exceptions.  If the constructor may throw, the consumers could end up
64  * spinning trying to move a message off the queue and failing, and then
65  * retrying.
66  */
67 template<typename MessageT>
68 class NotificationQueue {
69  public:
70   /**
71    * A callback interface for consuming messages from the queue as they arrive.
72    */
73   class Consumer : public DelayedDestruction, private EventHandler {
74    public:
75     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
76
77     Consumer()
78       : queue_(nullptr),
79         destroyedFlagPtr_(nullptr),
80         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
81
82     // create a consumer in-place, without the need to build new class
83     template <typename TCallback>
84     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
85         TCallback&& callback);
86
87     /**
88      * messageAvailable() will be invoked whenever a new
89      * message is available from the pipe.
90      */
91     virtual void messageAvailable(MessageT&& message) = 0;
92
93     /**
94      * Begin consuming messages from the specified queue.
95      *
96      * messageAvailable() will be called whenever a message is available.  This
97      * consumer will continue to consume messages until stopConsuming() is
98      * called.
99      *
100      * A Consumer may only consume messages from a single NotificationQueue at
101      * a time.  startConsuming() should not be called if this consumer is
102      * already consuming.
103      */
104     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
105       init(eventBase, queue);
106       registerHandler(READ | PERSIST);
107     }
108
109     /**
110      * Same as above but registers this event handler as internal so that it
111      * doesn't count towards the pending reader count for the IOLoop.
112      */
113     void startConsumingInternal(
114         EventBase* eventBase, NotificationQueue* queue) {
115       init(eventBase, queue);
116       registerInternalHandler(READ | PERSIST);
117     }
118
119     /**
120      * Stop consuming messages.
121      *
122      * startConsuming() may be called again to resume consumption of messages
123      * at a later point in time.
124      */
125     void stopConsuming();
126
127     /**
128      * Consume messages off the queue until it is empty. No messages may be
129      * added to the queue while it is draining, so that the process is bounded.
130      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
131      * and tryPutMessageNoThrow will return false.
132      *
133      * @returns true if the queue was drained, false otherwise. In practice,
134      * this will only fail if someone else is already draining the queue.
135      */
136     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
137
138     /**
139      * Get the NotificationQueue that this consumer is currently consuming
140      * messages from.  Returns nullptr if the consumer is not currently
141      * consuming events from any queue.
142      */
143     NotificationQueue* getCurrentQueue() const {
144       return queue_;
145     }
146
147     /**
148      * Set a limit on how many messages this consumer will read each iteration
149      * around the event loop.
150      *
151      * This helps rate-limit how much work the Consumer will do each event loop
152      * iteration, to prevent it from starving other event handlers.
153      *
154      * A limit of 0 means no limit will be enforced.  If unset, the limit
155      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
156      */
157     void setMaxReadAtOnce(uint32_t maxAtOnce) {
158       maxReadAtOnce_ = maxAtOnce;
159     }
160     uint32_t getMaxReadAtOnce() const {
161       return maxReadAtOnce_;
162     }
163
164     EventBase* getEventBase() {
165       return base_;
166     }
167
168     void handlerReady(uint16_t events) noexcept override;
169
170    protected:
171
172     void destroy() override;
173
174     virtual ~Consumer() {}
175
176    private:
177     /**
178      * Consume messages off the the queue until
179      *   - the queue is empty (1), or
180      *   - until the consumer is destroyed, or
181      *   - until the consumer is uninstalled, or
182      *   - an exception is thrown in the course of dequeueing, or
183      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
184      *
185      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
186      */
187     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
188
189     void setActive(bool active, bool shouldLock = false) {
190       if (!queue_) {
191         active_ = active;
192         return;
193       }
194       if (shouldLock) {
195         queue_->spinlock_.lock();
196       }
197       if (!active_ && active) {
198         ++queue_->numActiveConsumers_;
199       } else if (active_ && !active) {
200         --queue_->numActiveConsumers_;
201       }
202       active_ = active;
203       if (shouldLock) {
204         queue_->spinlock_.unlock();
205       }
206     }
207     void init(EventBase* eventBase, NotificationQueue* queue);
208
209     NotificationQueue* queue_;
210     bool* destroyedFlagPtr_;
211     uint32_t maxReadAtOnce_;
212     EventBase* base_;
213     bool active_{false};
214   };
215
216   enum class FdType {
217     PIPE,
218 #ifdef FOLLY_HAVE_EVENTFD
219     EVENTFD,
220 #endif
221   };
222
223   /**
224    * Create a new NotificationQueue.
225    *
226    * If the maxSize parameter is specified, this sets the maximum queue size
227    * that will be enforced by tryPutMessage().  (This size is advisory, and may
228    * be exceeded if producers explicitly use putMessage() instead of
229    * tryPutMessage().)
230    *
231    * The fdType parameter determines the type of file descriptor used
232    * internally to signal message availability.  The default (eventfd) is
233    * preferable for performance and because it won't fail when the queue gets
234    * too long.  It is not available on on older and non-linux kernels, however.
235    * In this case the code will fall back to using a pipe, the parameter is
236    * mostly for testing purposes.
237    */
238   explicit NotificationQueue(uint32_t maxSize = 0,
239 #ifdef FOLLY_HAVE_EVENTFD
240                              FdType fdType = FdType::EVENTFD)
241 #else
242                              FdType fdType = FdType::PIPE)
243 #endif
244       : eventfd_(-1),
245         pipeFds_{-1, -1},
246         advisoryMaxQueueSize_(maxSize),
247         pid_(pid_t(getpid())),
248         queue_() {
249
250     RequestContext::saveContext();
251
252 #ifdef FOLLY_HAVE_EVENTFD
253     if (fdType == FdType::EVENTFD) {
254       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
255       if (eventfd_ == -1) {
256         if (errno == ENOSYS || errno == EINVAL) {
257           // eventfd not availalble
258           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
259                      << errno << ", falling back to pipe mode (is your kernel "
260                      << "> 2.6.30?)";
261           fdType = FdType::PIPE;
262         } else {
263           // some other error
264           folly::throwSystemError("Failed to create eventfd for "
265                                   "NotificationQueue", errno);
266         }
267       }
268     }
269 #endif
270     if (fdType == FdType::PIPE) {
271       if (pipe(pipeFds_)) {
272         folly::throwSystemError("Failed to create pipe for NotificationQueue",
273                                 errno);
274       }
275       try {
276         // put both ends of the pipe into non-blocking mode
277         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
278           folly::throwSystemError("failed to put NotificationQueue pipe read "
279                                   "endpoint into non-blocking mode", errno);
280         }
281         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
282           folly::throwSystemError("failed to put NotificationQueue pipe write "
283                                   "endpoint into non-blocking mode", errno);
284         }
285       } catch (...) {
286         ::close(pipeFds_[0]);
287         ::close(pipeFds_[1]);
288         throw;
289       }
290     }
291   }
292
293   ~NotificationQueue() {
294     if (eventfd_ >= 0) {
295       ::close(eventfd_);
296       eventfd_ = -1;
297     }
298     if (pipeFds_[0] >= 0) {
299       ::close(pipeFds_[0]);
300       pipeFds_[0] = -1;
301     }
302     if (pipeFds_[1] >= 0) {
303       ::close(pipeFds_[1]);
304       pipeFds_[1] = -1;
305     }
306   }
307
308   /**
309    * Set the advisory maximum queue size.
310    *
311    * This maximum queue size affects calls to tryPutMessage().  Message
312    * producers can still use the putMessage() call to unconditionally put a
313    * message on the queue, ignoring the configured maximum queue size.  This
314    * can cause the queue size to exceed the configured maximum.
315    */
316   void setMaxQueueSize(uint32_t max) {
317     advisoryMaxQueueSize_ = max;
318   }
319
320   /**
321    * Attempt to put a message on the queue if the queue is not already full.
322    *
323    * If the queue is full, a std::overflow_error will be thrown.  The
324    * setMaxQueueSize() function controls the maximum queue size.
325    *
326    * If the queue is currently draining, an std::runtime_error will be thrown.
327    *
328    * This method may contend briefly on a spinlock if many threads are
329    * concurrently accessing the queue, but for all intents and purposes it will
330    * immediately place the message on the queue and return.
331    *
332    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
333    * may throw any other exception thrown by the MessageT move/copy
334    * constructor.
335    */
336   void tryPutMessage(MessageT&& message) {
337     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
338   }
339   void tryPutMessage(const MessageT& message) {
340     putMessageImpl(message, advisoryMaxQueueSize_);
341   }
342
343   /**
344    * No-throw versions of the above.  Instead returns true on success, false on
345    * failure.
346    *
347    * Only std::overflow_error (the common exception case) and std::runtime_error
348    * (which indicates that the queue is being drained) are prevented from being
349    * thrown. User code must still catch std::bad_alloc errors.
350    */
351   bool tryPutMessageNoThrow(MessageT&& message) {
352     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
353   }
354   bool tryPutMessageNoThrow(const MessageT& message) {
355     return putMessageImpl(message, advisoryMaxQueueSize_, false);
356   }
357
358   /**
359    * Unconditionally put a message on the queue.
360    *
361    * This method is like tryPutMessage(), but ignores the maximum queue size
362    * and always puts the message on the queue, even if the maximum queue size
363    * would be exceeded.
364    *
365    * putMessage() may throw
366    *   - std::bad_alloc if memory allocation fails, and may
367    *   - std::runtime_error if the queue is currently draining
368    *   - any other exception thrown by the MessageT move/copy constructor.
369    */
370   void putMessage(MessageT&& message) {
371     putMessageImpl(std::move(message), 0);
372   }
373   void putMessage(const MessageT& message) {
374     putMessageImpl(message, 0);
375   }
376
377   /**
378    * Put several messages on the queue.
379    */
380   template<typename InputIteratorT>
381   void putMessages(InputIteratorT first, InputIteratorT last) {
382     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
383       IterCategory;
384     putMessagesImpl(first, last, IterCategory());
385   }
386
387   /**
388    * Try to immediately pull a message off of the queue, without blocking.
389    *
390    * If a message is immediately available, the result parameter will be
391    * updated to contain the message contents and true will be returned.
392    *
393    * If no message is available, false will be returned and result will be left
394    * unmodified.
395    */
396   bool tryConsume(MessageT& result) {
397     SCOPE_EXIT { syncSignalAndQueue(); };
398
399     checkPid();
400
401     folly::SpinLockGuard g(spinlock_);
402
403     if (UNLIKELY(queue_.empty())) {
404       return false;
405     }
406
407     auto data = std::move(queue_.front());
408     result = data.first;
409     RequestContext::setContext(data.second);
410
411     queue_.pop_front();
412
413     return true;
414   }
415
416   size_t size() const {
417     folly::SpinLockGuard g(spinlock_);
418     return queue_.size();
419   }
420
421   /**
422    * Check that the NotificationQueue is being used from the correct process.
423    *
424    * If you create a NotificationQueue in one process, then fork, and try to
425    * send messages to the queue from the child process, you're going to have a
426    * bad time.  Unfortunately users have (accidentally) run into this.
427    *
428    * Because we use an eventfd/pipe, the child process can actually signal the
429    * parent process that an event is ready.  However, it can't put anything on
430    * the parent's queue, so the parent wakes up and finds an empty queue.  This
431    * check ensures that we catch the problem in the misbehaving child process
432    * code, and crash before signalling the parent process.
433    */
434   void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
435
436  private:
437   // Forbidden copy constructor and assignment operator
438   NotificationQueue(NotificationQueue const &) = delete;
439   NotificationQueue& operator=(NotificationQueue const &) = delete;
440
441   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
442     DCHECK(0 == spinlock_.trylock());
443     if (maxSize > 0 && queue_.size() >= maxSize) {
444       if (throws) {
445         throw std::overflow_error("unable to add message to NotificationQueue: "
446                                   "queue is full");
447       }
448       return false;
449     }
450     return true;
451   }
452
453   inline bool checkDraining(bool throws=true) {
454     if (UNLIKELY(draining_ && throws)) {
455       throw std::runtime_error("queue is draining, cannot add message");
456     }
457     return draining_;
458   }
459
460 #ifdef __ANDROID__
461   // TODO 10860938 Remove after figuring out crash
462   mutable std::atomic<int> eventBytes_{0};
463   mutable std::atomic<int> maxEventBytes_{0};
464 #endif
465
466   void ensureSignalLocked() const {
467     // semantics: empty fd == empty queue <=> !signal_
468     if (signal_) {
469       return;
470     }
471
472     ssize_t bytes_written = 0;
473     ssize_t bytes_expected = 0;
474
475     do {
476       if (eventfd_ >= 0) {
477         // eventfd(2) dictates that we must write a 64-bit integer
478         uint64_t signal = 1;
479         bytes_expected = static_cast<ssize_t>(sizeof(signal));
480         bytes_written = ::write(eventfd_, &signal, bytes_expected);
481       } else {
482         uint8_t signal = 1;
483         bytes_expected = static_cast<ssize_t>(sizeof(signal));
484         bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
485       }
486     } while (bytes_written == -1 && errno == EINTR);
487
488 #ifdef __ANDROID__
489     if (bytes_written > 0) {
490       eventBytes_ += bytes_written;
491       maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
492     }
493 #endif
494
495     if (bytes_written == bytes_expected) {
496       signal_ = true;
497     } else {
498 #ifdef __ANDROID__
499       LOG(ERROR) << "NotificationQueue Write Error=" << errno
500                  << " bytesInPipe=" << eventBytes_
501                  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
502 #endif
503       folly::throwSystemError("failed to signal NotificationQueue after "
504                               "write", errno);
505     }
506   }
507
508   void drainSignalsLocked() {
509     ssize_t bytes_read = 0;
510     if (eventfd_ > 0) {
511       uint64_t message;
512       bytes_read = readNoInt(eventfd_, &message, sizeof(message));
513       CHECK(bytes_read != -1 || errno == EAGAIN);
514     } else {
515       // There should only be one byte in the pipe. To avoid potential leaks we still drain.
516       uint8_t message[32];
517       ssize_t result;
518       while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
519         bytes_read += result;
520       }
521       CHECK(result == -1 && errno == EAGAIN);
522       LOG_IF(ERROR, bytes_read > 1)
523         << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
524         << bytes_read << " bytes, expected <= 1";
525     }
526     LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
527       << "[NotificationQueue] Unexpected state while draining signals: signal_="
528       << signal_ << " bytes_read=" << bytes_read;
529
530     signal_ = false;
531
532 #ifdef __ANDROID__
533     if (bytes_read > 0) {
534       eventBytes_ -= bytes_read;
535     }
536 #endif
537   }
538
539   void ensureSignal() const {
540     folly::SpinLockGuard g(spinlock_);
541     ensureSignalLocked();
542   }
543
544   void syncSignalAndQueue() {
545     folly::SpinLockGuard g(spinlock_);
546
547     if (queue_.empty()) {
548       drainSignalsLocked();
549     } else {
550       ensureSignalLocked();
551     }
552   }
553
554   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
555     checkPid();
556     bool signal = false;
557     {
558       folly::SpinLockGuard g(spinlock_);
559       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
560         return false;
561       }
562       // We only need to signal an event if not all consumers are
563       // awake.
564       if (numActiveConsumers_ < numConsumers_) {
565         signal = true;
566       }
567       queue_.emplace_back(std::move(message), RequestContext::saveContext());
568       if (signal) {
569         ensureSignalLocked();
570       }
571     }
572     return true;
573   }
574
575   bool putMessageImpl(
576     const MessageT& message, size_t maxSize, bool throws=true) {
577     checkPid();
578     bool signal = false;
579     {
580       folly::SpinLockGuard g(spinlock_);
581       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
582         return false;
583       }
584       if (numActiveConsumers_ < numConsumers_) {
585         signal = true;
586       }
587       queue_.emplace_back(message, RequestContext::saveContext());
588       if (signal) {
589         ensureSignalLocked();
590       }
591     }
592     return true;
593   }
594
595   template<typename InputIteratorT>
596   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
597                        std::input_iterator_tag) {
598     checkPid();
599     bool signal = false;
600     size_t numAdded = 0;
601     {
602       folly::SpinLockGuard g(spinlock_);
603       checkDraining();
604       while (first != last) {
605         queue_.emplace_back(*first, RequestContext::saveContext());
606         ++first;
607         ++numAdded;
608       }
609       if (numActiveConsumers_ < numConsumers_) {
610         signal = true;
611       }
612       if (signal) {
613         ensureSignalLocked();
614       }
615     }
616   }
617
618   mutable folly::SpinLock spinlock_;
619   mutable bool signal_{false};
620   int eventfd_;
621   int pipeFds_[2]; // to fallback to on older/non-linux systems
622   uint32_t advisoryMaxQueueSize_;
623   pid_t pid_;
624   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
625   int numConsumers_{0};
626   std::atomic<int> numActiveConsumers_{0};
627   bool draining_{false};
628 };
629
630 template<typename MessageT>
631 void NotificationQueue<MessageT>::Consumer::destroy() {
632   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
633   // will be non-nullptr.  Mark the value that it points to, so that
634   // handlerReady() will know the callback is destroyed, and that it cannot
635   // access any member variables anymore.
636   if (destroyedFlagPtr_) {
637     *destroyedFlagPtr_ = true;
638   }
639   stopConsuming();
640   DelayedDestruction::destroy();
641 }
642
643 template<typename MessageT>
644 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
645     noexcept {
646   consumeMessages(false);
647 }
648
649 template<typename MessageT>
650 void NotificationQueue<MessageT>::Consumer::consumeMessages(
651     bool isDrain, size_t* numConsumed) noexcept {
652   DestructorGuard dg(this);
653   uint32_t numProcessed = 0;
654   setActive(true);
655   SCOPE_EXIT {
656     if (queue_) {
657       queue_->syncSignalAndQueue();
658     }
659   };
660   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
661   SCOPE_EXIT {
662     if (numConsumed != nullptr) {
663       *numConsumed = numProcessed;
664     }
665   };
666   while (true) {
667     // Now pop the message off of the queue.
668     //
669     // We have to manually acquire and release the spinlock here, rather than
670     // using SpinLockHolder since the MessageT has to be constructed while
671     // holding the spinlock and available after we release it.  SpinLockHolder
672     // unfortunately doesn't provide a release() method.  (We can't construct
673     // MessageT first since we have no guarantee that MessageT has a default
674     // constructor.
675     queue_->spinlock_.lock();
676     bool locked = true;
677
678     try {
679       if (UNLIKELY(queue_->queue_.empty())) {
680         // If there is no message, we've reached the end of the queue, return.
681         setActive(false);
682         queue_->spinlock_.unlock();
683         return;
684       }
685
686       // Pull a message off the queue.
687       auto& data = queue_->queue_.front();
688
689       MessageT msg(std::move(data.first));
690       auto old_ctx =
691         RequestContext::setContext(data.second);
692       queue_->queue_.pop_front();
693
694       // Check to see if the queue is empty now.
695       // We use this as an optimization to see if we should bother trying to
696       // loop again and read another message after invoking this callback.
697       bool wasEmpty = queue_->queue_.empty();
698       if (wasEmpty) {
699         setActive(false);
700       }
701
702       // Now unlock the spinlock before we invoke the callback.
703       queue_->spinlock_.unlock();
704       locked = false;
705
706       // Call the callback
707       bool callbackDestroyed = false;
708       CHECK(destroyedFlagPtr_ == nullptr);
709       destroyedFlagPtr_ = &callbackDestroyed;
710       messageAvailable(std::move(msg));
711       destroyedFlagPtr_ = nullptr;
712
713       RequestContext::setContext(old_ctx);
714
715       // If the callback was destroyed before it returned, we are done
716       if (callbackDestroyed) {
717         return;
718       }
719
720       // If the callback is no longer installed, we are done.
721       if (queue_ == nullptr) {
722         return;
723       }
724
725       // If we have hit maxReadAtOnce_, we are done.
726       ++numProcessed;
727       if (!isDrain && maxReadAtOnce_ > 0 &&
728           numProcessed >= maxReadAtOnce_) {
729         return;
730       }
731
732       // If the queue was empty before we invoked the callback, it's probable
733       // that it is still empty now.  Just go ahead and return, rather than
734       // looping again and trying to re-read from the eventfd.  (If a new
735       // message had in fact arrived while we were invoking the callback, we
736       // will simply be woken up the next time around the event loop and will
737       // process the message then.)
738       if (wasEmpty) {
739         return;
740       }
741     } catch (const std::exception& ex) {
742       // This catch block is really just to handle the case where the MessageT
743       // constructor throws.  The messageAvailable() callback itself is
744       // declared as noexcept and should never throw.
745       //
746       // If the MessageT constructor does throw we try to handle it as best as
747       // we can, but we can't work miracles.  We will just ignore the error for
748       // now and return.  The next time around the event loop we will end up
749       // trying to read the message again.  If MessageT continues to throw we
750       // will never make forward progress and will keep trying each time around
751       // the event loop.
752       if (locked) {
753         // Unlock the spinlock.
754         queue_->spinlock_.unlock();
755       }
756
757       return;
758     }
759   }
760 }
761
762 template<typename MessageT>
763 void NotificationQueue<MessageT>::Consumer::init(
764     EventBase* eventBase,
765     NotificationQueue* queue) {
766   assert(eventBase->isInEventBaseThread());
767   assert(queue_ == nullptr);
768   assert(!isHandlerRegistered());
769   queue->checkPid();
770
771   base_ = eventBase;
772
773   queue_ = queue;
774
775   {
776     folly::SpinLockGuard g(queue_->spinlock_);
777     queue_->numConsumers_++;
778   }
779   queue_->ensureSignal();
780
781   if (queue_->eventfd_ >= 0) {
782     initHandler(eventBase, queue_->eventfd_);
783   } else {
784     initHandler(eventBase, queue_->pipeFds_[0]);
785   }
786 }
787
788 template<typename MessageT>
789 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
790   if (queue_ == nullptr) {
791     assert(!isHandlerRegistered());
792     return;
793   }
794
795   {
796     folly::SpinLockGuard g(queue_->spinlock_);
797     queue_->numConsumers_--;
798     setActive(false);
799   }
800
801   assert(isHandlerRegistered());
802   unregisterHandler();
803   detachEventBase();
804   queue_ = nullptr;
805 }
806
807 template<typename MessageT>
808 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
809     size_t* numConsumed) noexcept {
810   DestructorGuard dg(this);
811   {
812     folly::SpinLockGuard g(queue_->spinlock_);
813     if (queue_->draining_) {
814       return false;
815     }
816     queue_->draining_ = true;
817   }
818   consumeMessages(true, numConsumed);
819   {
820     folly::SpinLockGuard g(queue_->spinlock_);
821     queue_->draining_ = false;
822   }
823   return true;
824 }
825
826 /**
827  * Creates a NotificationQueue::Consumer wrapping a function object
828  * Modeled after AsyncTimeout::make
829  *
830  */
831
832 namespace detail {
833
834 template <typename MessageT, typename TCallback>
835 struct notification_queue_consumer_wrapper
836     : public NotificationQueue<MessageT>::Consumer {
837
838   template <typename UCallback>
839   explicit notification_queue_consumer_wrapper(UCallback&& callback)
840       : callback_(std::forward<UCallback>(callback)) {}
841
842   // we are being stricter here and requiring noexcept for callback
843   void messageAvailable(MessageT&& message) override {
844     static_assert(
845       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
846       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
847     );
848
849     callback_(std::forward<MessageT>(message));
850   }
851
852  private:
853   TCallback callback_;
854 };
855
856 } // namespace detail
857
858 template <typename MessageT>
859 template <typename TCallback>
860 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
861                 DelayedDestruction::Destructor>
862 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
863   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
864                          DelayedDestruction::Destructor>(
865       new detail::notification_queue_consumer_wrapper<
866           MessageT,
867           typename std::decay<TCallback>::type>(
868           std::forward<TCallback>(callback)));
869 }
870
871 } // folly