apply clang-tidy modernize-use-override
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <sys/types.h>
20
21 #include <algorithm>
22 #include <deque>
23 #include <iterator>
24 #include <memory>
25 #include <stdexcept>
26 #include <utility>
27
28 #include <folly/Exception.h>
29 #include <folly/FileUtil.h>
30 #include <folly/io/async/EventBase.h>
31 #include <folly/io/async/EventHandler.h>
32 #include <folly/io/async/DelayedDestruction.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/Likely.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/SpinLock.h>
37 #include <folly/portability/Fcntl.h>
38 #include <folly/portability/Sockets.h>
39 #include <folly/portability/Unistd.h>
40
41 #include <glog/logging.h>
42
43 #if __linux__ && !__ANDROID__
44 #define FOLLY_HAVE_EVENTFD
45 #include <folly/io/async/EventFDWrapper.h>
46 #endif
47
48 namespace folly {
49
50 /**
51  * A producer-consumer queue for passing messages between EventBase threads.
52  *
53  * Messages can be added to the queue from any thread.  Multiple consumers may
54  * listen to the queue from multiple EventBase threads.
55  *
56  * A NotificationQueue may not be destroyed while there are still consumers
57  * registered to receive events from the queue.  It is the user's
58  * responsibility to ensure that all consumers are unregistered before the
59  * queue is destroyed.
60  *
61  * MessageT should be MoveConstructible (i.e., must support either a move
62  * constructor or a copy constructor, or both).  Ideally it's move constructor
63  * (or copy constructor if no move constructor is provided) should never throw
64  * exceptions.  If the constructor may throw, the consumers could end up
65  * spinning trying to move a message off the queue and failing, and then
66  * retrying.
67  */
68 template<typename MessageT>
69 class NotificationQueue {
70  public:
71   /**
72    * A callback interface for consuming messages from the queue as they arrive.
73    */
74   class Consumer : public DelayedDestruction, private EventHandler {
75    public:
76     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
77
78     Consumer()
79       : queue_(nullptr),
80         destroyedFlagPtr_(nullptr),
81         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
82
83     // create a consumer in-place, without the need to build new class
84     template <typename TCallback>
85     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
86         TCallback&& callback);
87
88     /**
89      * messageAvailable() will be invoked whenever a new
90      * message is available from the pipe.
91      */
92     virtual void messageAvailable(MessageT&& message) noexcept = 0;
93
94     /**
95      * Begin consuming messages from the specified queue.
96      *
97      * messageAvailable() will be called whenever a message is available.  This
98      * consumer will continue to consume messages until stopConsuming() is
99      * called.
100      *
101      * A Consumer may only consume messages from a single NotificationQueue at
102      * a time.  startConsuming() should not be called if this consumer is
103      * already consuming.
104      */
105     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
106       init(eventBase, queue);
107       registerHandler(READ | PERSIST);
108     }
109
110     /**
111      * Same as above but registers this event handler as internal so that it
112      * doesn't count towards the pending reader count for the IOLoop.
113      */
114     void startConsumingInternal(
115         EventBase* eventBase, NotificationQueue* queue) {
116       init(eventBase, queue);
117       registerInternalHandler(READ | PERSIST);
118     }
119
120     /**
121      * Stop consuming messages.
122      *
123      * startConsuming() may be called again to resume consumption of messages
124      * at a later point in time.
125      */
126     void stopConsuming();
127
128     /**
129      * Consume messages off the queue until it is empty. No messages may be
130      * added to the queue while it is draining, so that the process is bounded.
131      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
132      * and tryPutMessageNoThrow will return false.
133      *
134      * @returns true if the queue was drained, false otherwise. In practice,
135      * this will only fail if someone else is already draining the queue.
136      */
137     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
138
139     /**
140      * Get the NotificationQueue that this consumer is currently consuming
141      * messages from.  Returns nullptr if the consumer is not currently
142      * consuming events from any queue.
143      */
144     NotificationQueue* getCurrentQueue() const {
145       return queue_;
146     }
147
148     /**
149      * Set a limit on how many messages this consumer will read each iteration
150      * around the event loop.
151      *
152      * This helps rate-limit how much work the Consumer will do each event loop
153      * iteration, to prevent it from starving other event handlers.
154      *
155      * A limit of 0 means no limit will be enforced.  If unset, the limit
156      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
157      */
158     void setMaxReadAtOnce(uint32_t maxAtOnce) {
159       maxReadAtOnce_ = maxAtOnce;
160     }
161     uint32_t getMaxReadAtOnce() const {
162       return maxReadAtOnce_;
163     }
164
165     EventBase* getEventBase() {
166       return base_;
167     }
168
169     void handlerReady(uint16_t events) noexcept override;
170
171    protected:
172
173     void destroy() override;
174
175     ~Consumer() override {}
176
177    private:
178     /**
179      * Consume messages off the the queue until
180      *   - the queue is empty (1), or
181      *   - until the consumer is destroyed, or
182      *   - until the consumer is uninstalled, or
183      *   - an exception is thrown in the course of dequeueing, or
184      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
185      *
186      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
187      */
188     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
189
190     void setActive(bool active, bool shouldLock = false) {
191       if (!queue_) {
192         active_ = active;
193         return;
194       }
195       if (shouldLock) {
196         queue_->spinlock_.lock();
197       }
198       if (!active_ && active) {
199         ++queue_->numActiveConsumers_;
200       } else if (active_ && !active) {
201         --queue_->numActiveConsumers_;
202       }
203       active_ = active;
204       if (shouldLock) {
205         queue_->spinlock_.unlock();
206       }
207     }
208     void init(EventBase* eventBase, NotificationQueue* queue);
209
210     NotificationQueue* queue_;
211     bool* destroyedFlagPtr_;
212     uint32_t maxReadAtOnce_;
213     EventBase* base_;
214     bool active_{false};
215   };
216
217   class SimpleConsumer {
218    public:
219     explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
220       ++queue_.numConsumers_;
221     }
222
223     ~SimpleConsumer() {
224       --queue_.numConsumers_;
225     }
226
227     int getFd() const {
228       return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
229     }
230
231    private:
232     NotificationQueue& queue_;
233   };
234
235   enum class FdType {
236     PIPE,
237 #ifdef FOLLY_HAVE_EVENTFD
238     EVENTFD,
239 #endif
240   };
241
242   /**
243    * Create a new NotificationQueue.
244    *
245    * If the maxSize parameter is specified, this sets the maximum queue size
246    * that will be enforced by tryPutMessage().  (This size is advisory, and may
247    * be exceeded if producers explicitly use putMessage() instead of
248    * tryPutMessage().)
249    *
250    * The fdType parameter determines the type of file descriptor used
251    * internally to signal message availability.  The default (eventfd) is
252    * preferable for performance and because it won't fail when the queue gets
253    * too long.  It is not available on on older and non-linux kernels, however.
254    * In this case the code will fall back to using a pipe, the parameter is
255    * mostly for testing purposes.
256    */
257   explicit NotificationQueue(uint32_t maxSize = 0,
258 #ifdef FOLLY_HAVE_EVENTFD
259                              FdType fdType = FdType::EVENTFD)
260 #else
261                              FdType fdType = FdType::PIPE)
262 #endif
263       : eventfd_(-1),
264         pipeFds_{-1, -1},
265         advisoryMaxQueueSize_(maxSize),
266         pid_(pid_t(getpid())),
267         queue_() {
268
269     RequestContext::saveContext();
270
271 #ifdef FOLLY_HAVE_EVENTFD
272     if (fdType == FdType::EVENTFD) {
273       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
274       if (eventfd_ == -1) {
275         if (errno == ENOSYS || errno == EINVAL) {
276           // eventfd not availalble
277           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
278                      << errno << ", falling back to pipe mode (is your kernel "
279                      << "> 2.6.30?)";
280           fdType = FdType::PIPE;
281         } else {
282           // some other error
283           folly::throwSystemError("Failed to create eventfd for "
284                                   "NotificationQueue", errno);
285         }
286       }
287     }
288 #endif
289     if (fdType == FdType::PIPE) {
290       if (pipe(pipeFds_)) {
291         folly::throwSystemError("Failed to create pipe for NotificationQueue",
292                                 errno);
293       }
294       try {
295         // put both ends of the pipe into non-blocking mode
296         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
297           folly::throwSystemError("failed to put NotificationQueue pipe read "
298                                   "endpoint into non-blocking mode", errno);
299         }
300         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
301           folly::throwSystemError("failed to put NotificationQueue pipe write "
302                                   "endpoint into non-blocking mode", errno);
303         }
304       } catch (...) {
305         ::close(pipeFds_[0]);
306         ::close(pipeFds_[1]);
307         throw;
308       }
309     }
310   }
311
312   ~NotificationQueue() {
313     if (eventfd_ >= 0) {
314       ::close(eventfd_);
315       eventfd_ = -1;
316     }
317     if (pipeFds_[0] >= 0) {
318       ::close(pipeFds_[0]);
319       pipeFds_[0] = -1;
320     }
321     if (pipeFds_[1] >= 0) {
322       ::close(pipeFds_[1]);
323       pipeFds_[1] = -1;
324     }
325   }
326
327   /**
328    * Set the advisory maximum queue size.
329    *
330    * This maximum queue size affects calls to tryPutMessage().  Message
331    * producers can still use the putMessage() call to unconditionally put a
332    * message on the queue, ignoring the configured maximum queue size.  This
333    * can cause the queue size to exceed the configured maximum.
334    */
335   void setMaxQueueSize(uint32_t max) {
336     advisoryMaxQueueSize_ = max;
337   }
338
339   /**
340    * Attempt to put a message on the queue if the queue is not already full.
341    *
342    * If the queue is full, a std::overflow_error will be thrown.  The
343    * setMaxQueueSize() function controls the maximum queue size.
344    *
345    * If the queue is currently draining, an std::runtime_error will be thrown.
346    *
347    * This method may contend briefly on a spinlock if many threads are
348    * concurrently accessing the queue, but for all intents and purposes it will
349    * immediately place the message on the queue and return.
350    *
351    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
352    * may throw any other exception thrown by the MessageT move/copy
353    * constructor.
354    */
355   void tryPutMessage(MessageT&& message) {
356     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
357   }
358   void tryPutMessage(const MessageT& message) {
359     putMessageImpl(message, advisoryMaxQueueSize_);
360   }
361
362   /**
363    * No-throw versions of the above.  Instead returns true on success, false on
364    * failure.
365    *
366    * Only std::overflow_error (the common exception case) and std::runtime_error
367    * (which indicates that the queue is being drained) are prevented from being
368    * thrown. User code must still catch std::bad_alloc errors.
369    */
370   bool tryPutMessageNoThrow(MessageT&& message) {
371     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
372   }
373   bool tryPutMessageNoThrow(const MessageT& message) {
374     return putMessageImpl(message, advisoryMaxQueueSize_, false);
375   }
376
377   /**
378    * Unconditionally put a message on the queue.
379    *
380    * This method is like tryPutMessage(), but ignores the maximum queue size
381    * and always puts the message on the queue, even if the maximum queue size
382    * would be exceeded.
383    *
384    * putMessage() may throw
385    *   - std::bad_alloc if memory allocation fails, and may
386    *   - std::runtime_error if the queue is currently draining
387    *   - any other exception thrown by the MessageT move/copy constructor.
388    */
389   void putMessage(MessageT&& message) {
390     putMessageImpl(std::move(message), 0);
391   }
392   void putMessage(const MessageT& message) {
393     putMessageImpl(message, 0);
394   }
395
396   /**
397    * Put several messages on the queue.
398    */
399   template<typename InputIteratorT>
400   void putMessages(InputIteratorT first, InputIteratorT last) {
401     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
402       IterCategory;
403     putMessagesImpl(first, last, IterCategory());
404   }
405
406   /**
407    * Try to immediately pull a message off of the queue, without blocking.
408    *
409    * If a message is immediately available, the result parameter will be
410    * updated to contain the message contents and true will be returned.
411    *
412    * If no message is available, false will be returned and result will be left
413    * unmodified.
414    */
415   bool tryConsume(MessageT& result) {
416     SCOPE_EXIT { syncSignalAndQueue(); };
417
418     checkPid();
419
420     folly::SpinLockGuard g(spinlock_);
421
422     if (UNLIKELY(queue_.empty())) {
423       return false;
424     }
425
426     auto& data = queue_.front();
427     result = std::move(data.first);
428     RequestContext::setContext(std::move(data.second));
429
430     queue_.pop_front();
431
432     return true;
433   }
434
435   size_t size() const {
436     folly::SpinLockGuard g(spinlock_);
437     return queue_.size();
438   }
439
440   /**
441    * Check that the NotificationQueue is being used from the correct process.
442    *
443    * If you create a NotificationQueue in one process, then fork, and try to
444    * send messages to the queue from the child process, you're going to have a
445    * bad time.  Unfortunately users have (accidentally) run into this.
446    *
447    * Because we use an eventfd/pipe, the child process can actually signal the
448    * parent process that an event is ready.  However, it can't put anything on
449    * the parent's queue, so the parent wakes up and finds an empty queue.  This
450    * check ensures that we catch the problem in the misbehaving child process
451    * code, and crash before signalling the parent process.
452    */
453   void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
454
455  private:
456   // Forbidden copy constructor and assignment operator
457   NotificationQueue(NotificationQueue const &) = delete;
458   NotificationQueue& operator=(NotificationQueue const &) = delete;
459
460   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
461     DCHECK(0 == spinlock_.try_lock());
462     if (maxSize > 0 && queue_.size() >= maxSize) {
463       if (throws) {
464         throw std::overflow_error("unable to add message to NotificationQueue: "
465                                   "queue is full");
466       }
467       return false;
468     }
469     return true;
470   }
471
472   inline bool checkDraining(bool throws=true) {
473     if (UNLIKELY(draining_ && throws)) {
474       throw std::runtime_error("queue is draining, cannot add message");
475     }
476     return draining_;
477   }
478
479 #ifdef __ANDROID__
480   // TODO 10860938 Remove after figuring out crash
481   mutable std::atomic<int> eventBytes_{0};
482   mutable std::atomic<int> maxEventBytes_{0};
483 #endif
484
485   void ensureSignalLocked() const {
486     // semantics: empty fd == empty queue <=> !signal_
487     if (signal_) {
488       return;
489     }
490
491     ssize_t bytes_written = 0;
492     size_t bytes_expected = 0;
493
494     do {
495       if (eventfd_ >= 0) {
496         // eventfd(2) dictates that we must write a 64-bit integer
497         uint64_t signal = 1;
498         bytes_expected = sizeof(signal);
499         bytes_written = ::write(eventfd_, &signal, bytes_expected);
500       } else {
501         uint8_t signal = 1;
502         bytes_expected = sizeof(signal);
503         bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
504       }
505     } while (bytes_written == -1 && errno == EINTR);
506
507 #ifdef __ANDROID__
508     if (bytes_written > 0) {
509       eventBytes_ += bytes_written;
510       maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
511     }
512 #endif
513
514     if (bytes_written == ssize_t(bytes_expected)) {
515       signal_ = true;
516     } else {
517 #ifdef __ANDROID__
518       LOG(ERROR) << "NotificationQueue Write Error=" << errno
519                  << " bytesInPipe=" << eventBytes_
520                  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
521 #endif
522       folly::throwSystemError("failed to signal NotificationQueue after "
523                               "write", errno);
524     }
525   }
526
527   void drainSignalsLocked() {
528     ssize_t bytes_read = 0;
529     if (eventfd_ > 0) {
530       uint64_t message;
531       bytes_read = readNoInt(eventfd_, &message, sizeof(message));
532       CHECK(bytes_read != -1 || errno == EAGAIN);
533     } else {
534       // There should only be one byte in the pipe. To avoid potential leaks we still drain.
535       uint8_t message[32];
536       ssize_t result;
537       while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
538         bytes_read += result;
539       }
540       CHECK(result == -1 && errno == EAGAIN);
541       LOG_IF(ERROR, bytes_read > 1)
542         << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
543         << bytes_read << " bytes, expected <= 1";
544     }
545     LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
546       << "[NotificationQueue] Unexpected state while draining signals: signal_="
547       << signal_ << " bytes_read=" << bytes_read;
548
549     signal_ = false;
550
551 #ifdef __ANDROID__
552     if (bytes_read > 0) {
553       eventBytes_ -= bytes_read;
554     }
555 #endif
556   }
557
558   void ensureSignal() const {
559     folly::SpinLockGuard g(spinlock_);
560     ensureSignalLocked();
561   }
562
563   void syncSignalAndQueue() {
564     folly::SpinLockGuard g(spinlock_);
565
566     if (queue_.empty()) {
567       drainSignalsLocked();
568     } else {
569       ensureSignalLocked();
570     }
571   }
572
573   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
574     checkPid();
575     bool signal = false;
576     {
577       folly::SpinLockGuard g(spinlock_);
578       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
579         return false;
580       }
581       // We only need to signal an event if not all consumers are
582       // awake.
583       if (numActiveConsumers_ < numConsumers_) {
584         signal = true;
585       }
586       queue_.emplace_back(std::move(message), RequestContext::saveContext());
587       if (signal) {
588         ensureSignalLocked();
589       }
590     }
591     return true;
592   }
593
594   bool putMessageImpl(
595     const MessageT& message, size_t maxSize, bool throws=true) {
596     checkPid();
597     bool signal = false;
598     {
599       folly::SpinLockGuard g(spinlock_);
600       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
601         return false;
602       }
603       if (numActiveConsumers_ < numConsumers_) {
604         signal = true;
605       }
606       queue_.emplace_back(message, RequestContext::saveContext());
607       if (signal) {
608         ensureSignalLocked();
609       }
610     }
611     return true;
612   }
613
614   template<typename InputIteratorT>
615   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
616                        std::input_iterator_tag) {
617     checkPid();
618     bool signal = false;
619     size_t numAdded = 0;
620     {
621       folly::SpinLockGuard g(spinlock_);
622       checkDraining();
623       while (first != last) {
624         queue_.emplace_back(*first, RequestContext::saveContext());
625         ++first;
626         ++numAdded;
627       }
628       if (numActiveConsumers_ < numConsumers_) {
629         signal = true;
630       }
631       if (signal) {
632         ensureSignalLocked();
633       }
634     }
635   }
636
637   mutable folly::SpinLock spinlock_;
638   mutable bool signal_{false};
639   int eventfd_;
640   int pipeFds_[2]; // to fallback to on older/non-linux systems
641   uint32_t advisoryMaxQueueSize_;
642   pid_t pid_;
643   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
644   int numConsumers_{0};
645   std::atomic<int> numActiveConsumers_{0};
646   bool draining_{false};
647 };
648
649 template<typename MessageT>
650 void NotificationQueue<MessageT>::Consumer::destroy() {
651   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
652   // will be non-nullptr.  Mark the value that it points to, so that
653   // handlerReady() will know the callback is destroyed, and that it cannot
654   // access any member variables anymore.
655   if (destroyedFlagPtr_) {
656     *destroyedFlagPtr_ = true;
657   }
658   stopConsuming();
659   DelayedDestruction::destroy();
660 }
661
662 template<typename MessageT>
663 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
664     noexcept {
665   consumeMessages(false);
666 }
667
668 template<typename MessageT>
669 void NotificationQueue<MessageT>::Consumer::consumeMessages(
670     bool isDrain, size_t* numConsumed) noexcept {
671   DestructorGuard dg(this);
672   uint32_t numProcessed = 0;
673   setActive(true);
674   SCOPE_EXIT {
675     if (queue_) {
676       queue_->syncSignalAndQueue();
677     }
678   };
679   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
680   SCOPE_EXIT {
681     if (numConsumed != nullptr) {
682       *numConsumed = numProcessed;
683     }
684   };
685   while (true) {
686     // Now pop the message off of the queue.
687     //
688     // We have to manually acquire and release the spinlock here, rather than
689     // using SpinLockHolder since the MessageT has to be constructed while
690     // holding the spinlock and available after we release it.  SpinLockHolder
691     // unfortunately doesn't provide a release() method.  (We can't construct
692     // MessageT first since we have no guarantee that MessageT has a default
693     // constructor.
694     queue_->spinlock_.lock();
695     bool locked = true;
696
697     try {
698       if (UNLIKELY(queue_->queue_.empty())) {
699         // If there is no message, we've reached the end of the queue, return.
700         setActive(false);
701         queue_->spinlock_.unlock();
702         return;
703       }
704
705       // Pull a message off the queue.
706       auto& data = queue_->queue_.front();
707
708       MessageT msg(std::move(data.first));
709       RequestContextScopeGuard rctx(std::move(data.second));
710       queue_->queue_.pop_front();
711
712       // Check to see if the queue is empty now.
713       // We use this as an optimization to see if we should bother trying to
714       // loop again and read another message after invoking this callback.
715       bool wasEmpty = queue_->queue_.empty();
716       if (wasEmpty) {
717         setActive(false);
718       }
719
720       // Now unlock the spinlock before we invoke the callback.
721       queue_->spinlock_.unlock();
722       locked = false;
723
724       // Call the callback
725       bool callbackDestroyed = false;
726       CHECK(destroyedFlagPtr_ == nullptr);
727       destroyedFlagPtr_ = &callbackDestroyed;
728       messageAvailable(std::move(msg));
729       destroyedFlagPtr_ = nullptr;
730
731       // If the callback was destroyed before it returned, we are done
732       if (callbackDestroyed) {
733         return;
734       }
735
736       // If the callback is no longer installed, we are done.
737       if (queue_ == nullptr) {
738         return;
739       }
740
741       // If we have hit maxReadAtOnce_, we are done.
742       ++numProcessed;
743       if (!isDrain && maxReadAtOnce_ > 0 &&
744           numProcessed >= maxReadAtOnce_) {
745         return;
746       }
747
748       // If the queue was empty before we invoked the callback, it's probable
749       // that it is still empty now.  Just go ahead and return, rather than
750       // looping again and trying to re-read from the eventfd.  (If a new
751       // message had in fact arrived while we were invoking the callback, we
752       // will simply be woken up the next time around the event loop and will
753       // process the message then.)
754       if (wasEmpty) {
755         return;
756       }
757     } catch (const std::exception&) {
758       // This catch block is really just to handle the case where the MessageT
759       // constructor throws.  The messageAvailable() callback itself is
760       // declared as noexcept and should never throw.
761       //
762       // If the MessageT constructor does throw we try to handle it as best as
763       // we can, but we can't work miracles.  We will just ignore the error for
764       // now and return.  The next time around the event loop we will end up
765       // trying to read the message again.  If MessageT continues to throw we
766       // will never make forward progress and will keep trying each time around
767       // the event loop.
768       if (locked) {
769         // Unlock the spinlock.
770         queue_->spinlock_.unlock();
771       }
772
773       return;
774     }
775   }
776 }
777
778 template<typename MessageT>
779 void NotificationQueue<MessageT>::Consumer::init(
780     EventBase* eventBase,
781     NotificationQueue* queue) {
782   assert(eventBase->isInEventBaseThread());
783   assert(queue_ == nullptr);
784   assert(!isHandlerRegistered());
785   queue->checkPid();
786
787   base_ = eventBase;
788
789   queue_ = queue;
790
791   {
792     folly::SpinLockGuard g(queue_->spinlock_);
793     queue_->numConsumers_++;
794   }
795   queue_->ensureSignal();
796
797   if (queue_->eventfd_ >= 0) {
798     initHandler(eventBase, queue_->eventfd_);
799   } else {
800     initHandler(eventBase, queue_->pipeFds_[0]);
801   }
802 }
803
804 template<typename MessageT>
805 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
806   if (queue_ == nullptr) {
807     assert(!isHandlerRegistered());
808     return;
809   }
810
811   {
812     folly::SpinLockGuard g(queue_->spinlock_);
813     queue_->numConsumers_--;
814     setActive(false);
815   }
816
817   assert(isHandlerRegistered());
818   unregisterHandler();
819   detachEventBase();
820   queue_ = nullptr;
821 }
822
823 template<typename MessageT>
824 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
825     size_t* numConsumed) noexcept {
826   DestructorGuard dg(this);
827   {
828     folly::SpinLockGuard g(queue_->spinlock_);
829     if (queue_->draining_) {
830       return false;
831     }
832     queue_->draining_ = true;
833   }
834   consumeMessages(true, numConsumed);
835   {
836     folly::SpinLockGuard g(queue_->spinlock_);
837     queue_->draining_ = false;
838   }
839   return true;
840 }
841
842 /**
843  * Creates a NotificationQueue::Consumer wrapping a function object
844  * Modeled after AsyncTimeout::make
845  *
846  */
847
848 namespace detail {
849
850 template <typename MessageT, typename TCallback>
851 struct notification_queue_consumer_wrapper
852     : public NotificationQueue<MessageT>::Consumer {
853
854   template <typename UCallback>
855   explicit notification_queue_consumer_wrapper(UCallback&& callback)
856       : callback_(std::forward<UCallback>(callback)) {}
857
858   // we are being stricter here and requiring noexcept for callback
859   void messageAvailable(MessageT&& message) noexcept override {
860     static_assert(
861       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
862       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
863     );
864
865     callback_(std::forward<MessageT>(message));
866   }
867
868  private:
869   TCallback callback_;
870 };
871
872 } // namespace detail
873
874 template <typename MessageT>
875 template <typename TCallback>
876 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
877                 DelayedDestruction::Destructor>
878 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
879   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
880                          DelayedDestruction::Destructor>(
881       new detail::notification_queue_consumer_wrapper<
882           MessageT,
883           typename std::decay<TCallback>::type>(
884           std::forward<TCallback>(callback)));
885 }
886
887 } // folly