NotificationQueue Logging for Android bug
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22
23 #include <algorithm>
24 #include <deque>
25 #include <iterator>
26 #include <memory>
27 #include <stdexcept>
28 #include <utility>
29
30 #include <folly/FileUtil.h>
31 #include <folly/io/async/EventBase.h>
32 #include <folly/io/async/EventHandler.h>
33 #include <folly/io/async/DelayedDestruction.h>
34 #include <folly/io/async/Request.h>
35 #include <folly/Likely.h>
36 #include <folly/ScopeGuard.h>
37 #include <folly/SpinLock.h>
38
39 #include <glog/logging.h>
40
41 #if __linux__ && !__ANDROID__
42 #define FOLLY_HAVE_EVENTFD
43 #include <folly/io/async/EventFDWrapper.h>
44 #endif
45
46 namespace folly {
47
48 /**
49  * A producer-consumer queue for passing messages between EventBase threads.
50  *
51  * Messages can be added to the queue from any thread.  Multiple consumers may
52  * listen to the queue from multiple EventBase threads.
53  *
54  * A NotificationQueue may not be destroyed while there are still consumers
55  * registered to receive events from the queue.  It is the user's
56  * responsibility to ensure that all consumers are unregistered before the
57  * queue is destroyed.
58  *
59  * MessageT should be MoveConstructible (i.e., must support either a move
60  * constructor or a copy constructor, or both).  Ideally it's move constructor
61  * (or copy constructor if no move constructor is provided) should never throw
62  * exceptions.  If the constructor may throw, the consumers could end up
63  * spinning trying to move a message off the queue and failing, and then
64  * retrying.
65  */
66 template<typename MessageT>
67 class NotificationQueue {
68  public:
69   /**
70    * A callback interface for consuming messages from the queue as they arrive.
71    */
72   class Consumer : public DelayedDestruction, private EventHandler {
73    public:
74     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
75
76     Consumer()
77       : queue_(nullptr),
78         destroyedFlagPtr_(nullptr),
79         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
80
81     // create a consumer in-place, without the need to build new class
82     template <typename TCallback>
83     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
84         TCallback&& callback);
85
86     /**
87      * messageAvailable() will be invoked whenever a new
88      * message is available from the pipe.
89      */
90     virtual void messageAvailable(MessageT&& message) = 0;
91
92     /**
93      * Begin consuming messages from the specified queue.
94      *
95      * messageAvailable() will be called whenever a message is available.  This
96      * consumer will continue to consume messages until stopConsuming() is
97      * called.
98      *
99      * A Consumer may only consume messages from a single NotificationQueue at
100      * a time.  startConsuming() should not be called if this consumer is
101      * already consuming.
102      */
103     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
104       init(eventBase, queue);
105       registerHandler(READ | PERSIST);
106     }
107
108     /**
109      * Same as above but registers this event handler as internal so that it
110      * doesn't count towards the pending reader count for the IOLoop.
111      */
112     void startConsumingInternal(
113         EventBase* eventBase, NotificationQueue* queue) {
114       init(eventBase, queue);
115       registerInternalHandler(READ | PERSIST);
116     }
117
118     /**
119      * Stop consuming messages.
120      *
121      * startConsuming() may be called again to resume consumption of messages
122      * at a later point in time.
123      */
124     void stopConsuming();
125
126     /**
127      * Consume messages off the queue until it is empty. No messages may be
128      * added to the queue while it is draining, so that the process is bounded.
129      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
130      * and tryPutMessageNoThrow will return false.
131      *
132      * @returns true if the queue was drained, false otherwise. In practice,
133      * this will only fail if someone else is already draining the queue.
134      */
135     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
136
137     /**
138      * Get the NotificationQueue that this consumer is currently consuming
139      * messages from.  Returns nullptr if the consumer is not currently
140      * consuming events from any queue.
141      */
142     NotificationQueue* getCurrentQueue() const {
143       return queue_;
144     }
145
146     /**
147      * Set a limit on how many messages this consumer will read each iteration
148      * around the event loop.
149      *
150      * This helps rate-limit how much work the Consumer will do each event loop
151      * iteration, to prevent it from starving other event handlers.
152      *
153      * A limit of 0 means no limit will be enforced.  If unset, the limit
154      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
155      */
156     void setMaxReadAtOnce(uint32_t maxAtOnce) {
157       maxReadAtOnce_ = maxAtOnce;
158     }
159     uint32_t getMaxReadAtOnce() const {
160       return maxReadAtOnce_;
161     }
162
163     EventBase* getEventBase() {
164       return base_;
165     }
166
167     void handlerReady(uint16_t events) noexcept override;
168
169    protected:
170
171     void destroy() override;
172
173     virtual ~Consumer() {}
174
175    private:
176     /**
177      * Consume messages off the the queue until
178      *   - the queue is empty (1), or
179      *   - until the consumer is destroyed, or
180      *   - until the consumer is uninstalled, or
181      *   - an exception is thrown in the course of dequeueing, or
182      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
183      *
184      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
185      */
186     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
187
188     void setActive(bool active, bool shouldLock = false) {
189       if (!queue_) {
190         active_ = active;
191         return;
192       }
193       if (shouldLock) {
194         queue_->spinlock_.lock();
195       }
196       if (!active_ && active) {
197         ++queue_->numActiveConsumers_;
198       } else if (active_ && !active) {
199         --queue_->numActiveConsumers_;
200       }
201       active_ = active;
202       if (shouldLock) {
203         queue_->spinlock_.unlock();
204       }
205     }
206     void init(EventBase* eventBase, NotificationQueue* queue);
207
208     NotificationQueue* queue_;
209     bool* destroyedFlagPtr_;
210     uint32_t maxReadAtOnce_;
211     EventBase* base_;
212     bool active_{false};
213   };
214
215   enum class FdType {
216     PIPE,
217 #ifdef FOLLY_HAVE_EVENTFD
218     EVENTFD,
219 #endif
220   };
221
222   /**
223    * Create a new NotificationQueue.
224    *
225    * If the maxSize parameter is specified, this sets the maximum queue size
226    * that will be enforced by tryPutMessage().  (This size is advisory, and may
227    * be exceeded if producers explicitly use putMessage() instead of
228    * tryPutMessage().)
229    *
230    * The fdType parameter determines the type of file descriptor used
231    * internally to signal message availability.  The default (eventfd) is
232    * preferable for performance and because it won't fail when the queue gets
233    * too long.  It is not available on on older and non-linux kernels, however.
234    * In this case the code will fall back to using a pipe, the parameter is
235    * mostly for testing purposes.
236    */
237   explicit NotificationQueue(uint32_t maxSize = 0,
238 #ifdef FOLLY_HAVE_EVENTFD
239                              FdType fdType = FdType::EVENTFD)
240 #else
241                              FdType fdType = FdType::PIPE)
242 #endif
243       : eventfd_(-1),
244         pipeFds_{-1, -1},
245         advisoryMaxQueueSize_(maxSize),
246         pid_(pid_t(getpid())),
247         queue_() {
248
249     RequestContext::saveContext();
250
251 #ifdef FOLLY_HAVE_EVENTFD
252     if (fdType == FdType::EVENTFD) {
253       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
254       if (eventfd_ == -1) {
255         if (errno == ENOSYS || errno == EINVAL) {
256           // eventfd not availalble
257           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
258                      << errno << ", falling back to pipe mode (is your kernel "
259                      << "> 2.6.30?)";
260           fdType = FdType::PIPE;
261         } else {
262           // some other error
263           folly::throwSystemError("Failed to create eventfd for "
264                                   "NotificationQueue", errno);
265         }
266       }
267     }
268 #endif
269     if (fdType == FdType::PIPE) {
270       if (pipe(pipeFds_)) {
271         folly::throwSystemError("Failed to create pipe for NotificationQueue",
272                                 errno);
273       }
274       try {
275         // put both ends of the pipe into non-blocking mode
276         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
277           folly::throwSystemError("failed to put NotificationQueue pipe read "
278                                   "endpoint into non-blocking mode", errno);
279         }
280         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
281           folly::throwSystemError("failed to put NotificationQueue pipe write "
282                                   "endpoint into non-blocking mode", errno);
283         }
284       } catch (...) {
285         ::close(pipeFds_[0]);
286         ::close(pipeFds_[1]);
287         throw;
288       }
289     }
290   }
291
292   ~NotificationQueue() {
293     if (eventfd_ >= 0) {
294       ::close(eventfd_);
295       eventfd_ = -1;
296     }
297     if (pipeFds_[0] >= 0) {
298       ::close(pipeFds_[0]);
299       pipeFds_[0] = -1;
300     }
301     if (pipeFds_[1] >= 0) {
302       ::close(pipeFds_[1]);
303       pipeFds_[1] = -1;
304     }
305   }
306
307   /**
308    * Set the advisory maximum queue size.
309    *
310    * This maximum queue size affects calls to tryPutMessage().  Message
311    * producers can still use the putMessage() call to unconditionally put a
312    * message on the queue, ignoring the configured maximum queue size.  This
313    * can cause the queue size to exceed the configured maximum.
314    */
315   void setMaxQueueSize(uint32_t max) {
316     advisoryMaxQueueSize_ = max;
317   }
318
319   /**
320    * Attempt to put a message on the queue if the queue is not already full.
321    *
322    * If the queue is full, a std::overflow_error will be thrown.  The
323    * setMaxQueueSize() function controls the maximum queue size.
324    *
325    * If the queue is currently draining, an std::runtime_error will be thrown.
326    *
327    * This method may contend briefly on a spinlock if many threads are
328    * concurrently accessing the queue, but for all intents and purposes it will
329    * immediately place the message on the queue and return.
330    *
331    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
332    * may throw any other exception thrown by the MessageT move/copy
333    * constructor.
334    */
335   void tryPutMessage(MessageT&& message) {
336     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
337   }
338   void tryPutMessage(const MessageT& message) {
339     putMessageImpl(message, advisoryMaxQueueSize_);
340   }
341
342   /**
343    * No-throw versions of the above.  Instead returns true on success, false on
344    * failure.
345    *
346    * Only std::overflow_error (the common exception case) and std::runtime_error
347    * (which indicates that the queue is being drained) are prevented from being
348    * thrown. User code must still catch std::bad_alloc errors.
349    */
350   bool tryPutMessageNoThrow(MessageT&& message) {
351     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
352   }
353   bool tryPutMessageNoThrow(const MessageT& message) {
354     return putMessageImpl(message, advisoryMaxQueueSize_, false);
355   }
356
357   /**
358    * Unconditionally put a message on the queue.
359    *
360    * This method is like tryPutMessage(), but ignores the maximum queue size
361    * and always puts the message on the queue, even if the maximum queue size
362    * would be exceeded.
363    *
364    * putMessage() may throw
365    *   - std::bad_alloc if memory allocation fails, and may
366    *   - std::runtime_error if the queue is currently draining
367    *   - any other exception thrown by the MessageT move/copy constructor.
368    */
369   void putMessage(MessageT&& message) {
370     putMessageImpl(std::move(message), 0);
371   }
372   void putMessage(const MessageT& message) {
373     putMessageImpl(message, 0);
374   }
375
376   /**
377    * Put several messages on the queue.
378    */
379   template<typename InputIteratorT>
380   void putMessages(InputIteratorT first, InputIteratorT last) {
381     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
382       IterCategory;
383     putMessagesImpl(first, last, IterCategory());
384   }
385
386   /**
387    * Try to immediately pull a message off of the queue, without blocking.
388    *
389    * If a message is immediately available, the result parameter will be
390    * updated to contain the message contents and true will be returned.
391    *
392    * If no message is available, false will be returned and result will be left
393    * unmodified.
394    */
395   bool tryConsume(MessageT& result) {
396     checkPid();
397
398     try {
399
400       folly::SpinLockGuard g(spinlock_);
401
402       if (UNLIKELY(queue_.empty())) {
403         return false;
404       }
405
406       auto data = std::move(queue_.front());
407       result = data.first;
408       RequestContext::setContext(data.second);
409
410       queue_.pop_front();
411     } catch (...) {
412       // Handle an exception if the assignment operator happens to throw.
413       // We consumed an event but weren't able to pop the message off the
414       // queue.  Signal the event again since the message is still in the
415       // queue.
416       signalEvent(1);
417       throw;
418     }
419
420     return true;
421   }
422
423   size_t size() const {
424     folly::SpinLockGuard g(spinlock_);
425     return queue_.size();
426   }
427
428   /**
429    * Check that the NotificationQueue is being used from the correct process.
430    *
431    * If you create a NotificationQueue in one process, then fork, and try to
432    * send messages to the queue from the child process, you're going to have a
433    * bad time.  Unfortunately users have (accidentally) run into this.
434    *
435    * Because we use an eventfd/pipe, the child process can actually signal the
436    * parent process that an event is ready.  However, it can't put anything on
437    * the parent's queue, so the parent wakes up and finds an empty queue.  This
438    * check ensures that we catch the problem in the misbehaving child process
439    * code, and crash before signalling the parent process.
440    */
441   void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
442
443  private:
444   // Forbidden copy constructor and assignment operator
445   NotificationQueue(NotificationQueue const &) = delete;
446   NotificationQueue& operator=(NotificationQueue const &) = delete;
447
448   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
449     DCHECK(0 == spinlock_.trylock());
450     if (maxSize > 0 && queue_.size() >= maxSize) {
451       if (throws) {
452         throw std::overflow_error("unable to add message to NotificationQueue: "
453                                   "queue is full");
454       }
455       return false;
456     }
457     return true;
458   }
459
460   inline bool checkDraining(bool throws=true) {
461     if (UNLIKELY(draining_ && throws)) {
462       throw std::runtime_error("queue is draining, cannot add message");
463     }
464     return draining_;
465   }
466
467 #ifdef __ANDROID__
468   // TODO 10860938 Remove after figuring out crash
469   mutable std::atomic<int> eventBytes_{0};
470   mutable std::atomic<int> maxEventBytes_{0};
471 #endif
472
473   inline void signalEvent(size_t numAdded = 1) const {
474     static const uint8_t kPipeMessage[] = {
475       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
476     };
477
478     ssize_t bytes_written = 0;
479     ssize_t bytes_expected = 0;
480     if (eventfd_ >= 0) {
481       // eventfd(2) dictates that we must write a 64-bit integer
482       uint64_t numAdded64(numAdded);
483       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
484       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
485     } else {
486       // pipe semantics, add one message for each numAdded
487       bytes_expected = numAdded;
488       do {
489         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
490         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
491         if (rc < 0) {
492           // TODO: if the pipe is full, write will fail with EAGAIN.
493           // See task #1044651 for how this could be handled
494           break;
495         }
496         numAdded -= rc;
497         bytes_written += rc;
498       } while (numAdded > 0);
499     }
500 #ifdef __ANDROID__
501     eventBytes_ += bytes_written;
502     maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
503 #endif
504
505     if (bytes_written != bytes_expected) {
506 #ifdef __ANDROID__
507       LOG(ERROR) << "NotificationQueue Write Error=" << errno
508                  << " bytesInPipe=" << eventBytes_
509                  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
510 #endif
511       folly::throwSystemError("failed to signal NotificationQueue after "
512                               "write", errno);
513     }
514   }
515
516   bool tryConsumeEvent() {
517     uint64_t value = 0;
518     ssize_t rc = -1;
519     if (eventfd_ >= 0) {
520       rc = readNoInt(eventfd_, &value, sizeof(value));
521     } else {
522       uint8_t value8;
523       rc = readNoInt(pipeFds_[0], &value8, sizeof(value8));
524       value = value8;
525 #ifdef __ANDROID__
526       eventBytes_ -= 1;
527 #endif
528     }
529     if (rc < 0) {
530       // EAGAIN should pretty much be the only error we can ever get.
531       // This means someone else already processed the only available message.
532       CHECK_EQ(errno, EAGAIN);
533       return false;
534     }
535     assert(value == 1);
536     return true;
537   }
538
539   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
540     checkPid();
541     bool signal = false;
542     {
543       folly::SpinLockGuard g(spinlock_);
544       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
545         return false;
546       }
547       // We only need to signal an event if not all consumers are
548       // awake.
549       if (numActiveConsumers_ < numConsumers_) {
550         signal = true;
551       }
552       queue_.emplace_back(std::move(message), RequestContext::saveContext());
553     }
554     if (signal) {
555       signalEvent();
556     }
557     return true;
558   }
559
560   bool putMessageImpl(
561     const MessageT& message, size_t maxSize, bool throws=true) {
562     checkPid();
563     bool signal = false;
564     {
565       folly::SpinLockGuard g(spinlock_);
566       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
567         return false;
568       }
569       if (numActiveConsumers_ < numConsumers_) {
570         signal = true;
571       }
572       queue_.emplace_back(message, RequestContext::saveContext());
573     }
574     if (signal) {
575       signalEvent();
576     }
577     return true;
578   }
579
580   template<typename InputIteratorT>
581   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
582                        std::input_iterator_tag) {
583     checkPid();
584     bool signal = false;
585     size_t numAdded = 0;
586     {
587       folly::SpinLockGuard g(spinlock_);
588       checkDraining();
589       while (first != last) {
590         queue_.emplace_back(*first, RequestContext::saveContext());
591         ++first;
592         ++numAdded;
593       }
594       if (numActiveConsumers_ < numConsumers_) {
595         signal = true;
596       }
597     }
598     if (signal) {
599       signalEvent();
600     }
601   }
602
603   mutable folly::SpinLock spinlock_;
604   int eventfd_;
605   int pipeFds_[2]; // to fallback to on older/non-linux systems
606   uint32_t advisoryMaxQueueSize_;
607   pid_t pid_;
608   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
609   int numConsumers_{0};
610   std::atomic<int> numActiveConsumers_{0};
611   bool draining_{false};
612 };
613
614 template<typename MessageT>
615 void NotificationQueue<MessageT>::Consumer::destroy() {
616   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
617   // will be non-nullptr.  Mark the value that it points to, so that
618   // handlerReady() will know the callback is destroyed, and that it cannot
619   // access any member variables anymore.
620   if (destroyedFlagPtr_) {
621     *destroyedFlagPtr_ = true;
622   }
623   stopConsuming();
624   DelayedDestruction::destroy();
625 }
626
627 template<typename MessageT>
628 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
629     noexcept {
630   consumeMessages(false);
631 }
632
633 template<typename MessageT>
634 void NotificationQueue<MessageT>::Consumer::consumeMessages(
635     bool isDrain, size_t* numConsumed) noexcept {
636   DestructorGuard dg(this);
637   uint32_t numProcessed = 0;
638   bool firstRun = true;
639   setActive(true);
640   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
641   SCOPE_EXIT {
642     if (numConsumed != nullptr) {
643       *numConsumed = numProcessed;
644     }
645   };
646   while (true) {
647     // Try to decrement the eventfd.
648     //
649     // The eventfd is only used to wake up the consumer - there may or
650     // may not actually be an event available (another consumer may
651     // have read it).  We don't really care, we only care about
652     // emptying the queue.
653     if (!isDrain && firstRun) {
654       queue_->tryConsumeEvent();
655       firstRun = false;
656     }
657
658     // Now pop the message off of the queue.
659     //
660     // We have to manually acquire and release the spinlock here, rather than
661     // using SpinLockHolder since the MessageT has to be constructed while
662     // holding the spinlock and available after we release it.  SpinLockHolder
663     // unfortunately doesn't provide a release() method.  (We can't construct
664     // MessageT first since we have no guarantee that MessageT has a default
665     // constructor.
666     queue_->spinlock_.lock();
667     bool locked = true;
668
669     try {
670       if (UNLIKELY(queue_->queue_.empty())) {
671         // If there is no message, we've reached the end of the queue, return.
672         setActive(false);
673         queue_->spinlock_.unlock();
674         return;
675       }
676
677       // Pull a message off the queue.
678       auto& data = queue_->queue_.front();
679
680       MessageT msg(std::move(data.first));
681       auto old_ctx =
682         RequestContext::setContext(data.second);
683       queue_->queue_.pop_front();
684
685       // Check to see if the queue is empty now.
686       // We use this as an optimization to see if we should bother trying to
687       // loop again and read another message after invoking this callback.
688       bool wasEmpty = queue_->queue_.empty();
689       if (wasEmpty) {
690         setActive(false);
691       }
692
693       // Now unlock the spinlock before we invoke the callback.
694       queue_->spinlock_.unlock();
695       locked = false;
696
697       // Call the callback
698       bool callbackDestroyed = false;
699       CHECK(destroyedFlagPtr_ == nullptr);
700       destroyedFlagPtr_ = &callbackDestroyed;
701       messageAvailable(std::move(msg));
702       destroyedFlagPtr_ = nullptr;
703
704       RequestContext::setContext(old_ctx);
705
706       // If the callback was destroyed before it returned, we are done
707       if (callbackDestroyed) {
708         return;
709       }
710
711       // If the callback is no longer installed, we are done.
712       if (queue_ == nullptr) {
713         return;
714       }
715
716       // If we have hit maxReadAtOnce_, we are done.
717       ++numProcessed;
718       if (!isDrain && maxReadAtOnce_ > 0 &&
719           numProcessed >= maxReadAtOnce_) {
720         queue_->signalEvent(1);
721         return;
722       }
723
724       // If the queue was empty before we invoked the callback, it's probable
725       // that it is still empty now.  Just go ahead and return, rather than
726       // looping again and trying to re-read from the eventfd.  (If a new
727       // message had in fact arrived while we were invoking the callback, we
728       // will simply be woken up the next time around the event loop and will
729       // process the message then.)
730       if (wasEmpty) {
731         return;
732       }
733     } catch (const std::exception& ex) {
734       // This catch block is really just to handle the case where the MessageT
735       // constructor throws.  The messageAvailable() callback itself is
736       // declared as noexcept and should never throw.
737       //
738       // If the MessageT constructor does throw we try to handle it as best as
739       // we can, but we can't work miracles.  We will just ignore the error for
740       // now and return.  The next time around the event loop we will end up
741       // trying to read the message again.  If MessageT continues to throw we
742       // will never make forward progress and will keep trying each time around
743       // the event loop.
744       if (locked) {
745         // Unlock the spinlock.
746         queue_->spinlock_.unlock();
747
748         // Push a notification back on the eventfd since we didn't actually
749         // read the message off of the queue.
750         if (!isDrain) {
751           queue_->signalEvent(1);
752         }
753       }
754
755       return;
756     }
757   }
758 }
759
760 template<typename MessageT>
761 void NotificationQueue<MessageT>::Consumer::init(
762     EventBase* eventBase,
763     NotificationQueue* queue) {
764   assert(eventBase->isInEventBaseThread());
765   assert(queue_ == nullptr);
766   assert(!isHandlerRegistered());
767   queue->checkPid();
768
769   base_ = eventBase;
770
771   queue_ = queue;
772
773   {
774     folly::SpinLockGuard g(queue_->spinlock_);
775     queue_->numConsumers_++;
776   }
777   queue_->signalEvent();
778
779   if (queue_->eventfd_ >= 0) {
780     initHandler(eventBase, queue_->eventfd_);
781   } else {
782     initHandler(eventBase, queue_->pipeFds_[0]);
783   }
784 }
785
786 template<typename MessageT>
787 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
788   if (queue_ == nullptr) {
789     assert(!isHandlerRegistered());
790     return;
791   }
792
793   {
794     folly::SpinLockGuard g(queue_->spinlock_);
795     queue_->numConsumers_--;
796     setActive(false);
797   }
798
799   assert(isHandlerRegistered());
800   unregisterHandler();
801   detachEventBase();
802   queue_ = nullptr;
803 }
804
805 template<typename MessageT>
806 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
807     size_t* numConsumed) noexcept {
808   DestructorGuard dg(this);
809   {
810     folly::SpinLockGuard g(queue_->spinlock_);
811     if (queue_->draining_) {
812       return false;
813     }
814     queue_->draining_ = true;
815   }
816   consumeMessages(true, numConsumed);
817   {
818     folly::SpinLockGuard g(queue_->spinlock_);
819     queue_->draining_ = false;
820   }
821   return true;
822 }
823
824 /**
825  * Creates a NotificationQueue::Consumer wrapping a function object
826  * Modeled after AsyncTimeout::make
827  *
828  */
829
830 namespace detail {
831
832 template <typename MessageT, typename TCallback>
833 struct notification_queue_consumer_wrapper
834     : public NotificationQueue<MessageT>::Consumer {
835
836   template <typename UCallback>
837   explicit notification_queue_consumer_wrapper(UCallback&& callback)
838       : callback_(std::forward<UCallback>(callback)) {}
839
840   // we are being stricter here and requiring noexcept for callback
841   void messageAvailable(MessageT&& message) override {
842     static_assert(
843       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
844       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
845     );
846
847     callback_(std::forward<MessageT>(message));
848   }
849
850  private:
851   TCallback callback_;
852 };
853
854 } // namespace detail
855
856 template <typename MessageT>
857 template <typename TCallback>
858 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
859                 DelayedDestruction::Destructor>
860 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
861   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
862                          DelayedDestruction::Destructor>(
863       new detail::notification_queue_consumer_wrapper<
864           MessageT,
865           typename std::decay<TCallback>::type>(
866           std::forward<TCallback>(callback)));
867 }
868
869 } // folly