28a2878eb81ecde122f68b5e440d870bdb0859bc
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <sys/types.h>
20
21 #include <algorithm>
22 #include <deque>
23 #include <iterator>
24 #include <memory>
25 #include <stdexcept>
26 #include <utility>
27
28 #include <folly/FileUtil.h>
29 #include <folly/io/async/EventBase.h>
30 #include <folly/io/async/EventHandler.h>
31 #include <folly/io/async/DelayedDestruction.h>
32 #include <folly/io/async/Request.h>
33 #include <folly/Likely.h>
34 #include <folly/ScopeGuard.h>
35 #include <folly/SpinLock.h>
36 #include <folly/portability/Fcntl.h>
37 #include <folly/portability/Sockets.h>
38 #include <folly/portability/Unistd.h>
39
40 #include <glog/logging.h>
41
42 #if __linux__ && !__ANDROID__
43 #define FOLLY_HAVE_EVENTFD
44 #include <folly/io/async/EventFDWrapper.h>
45 #endif
46
47 namespace folly {
48
49 /**
50  * A producer-consumer queue for passing messages between EventBase threads.
51  *
52  * Messages can be added to the queue from any thread.  Multiple consumers may
53  * listen to the queue from multiple EventBase threads.
54  *
55  * A NotificationQueue may not be destroyed while there are still consumers
56  * registered to receive events from the queue.  It is the user's
57  * responsibility to ensure that all consumers are unregistered before the
58  * queue is destroyed.
59  *
60  * MessageT should be MoveConstructible (i.e., must support either a move
61  * constructor or a copy constructor, or both).  Ideally it's move constructor
62  * (or copy constructor if no move constructor is provided) should never throw
63  * exceptions.  If the constructor may throw, the consumers could end up
64  * spinning trying to move a message off the queue and failing, and then
65  * retrying.
66  */
67 template<typename MessageT>
68 class NotificationQueue {
69  public:
70   /**
71    * A callback interface for consuming messages from the queue as they arrive.
72    */
73   class Consumer : public DelayedDestruction, private EventHandler {
74    public:
75     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
76
77     Consumer()
78       : queue_(nullptr),
79         destroyedFlagPtr_(nullptr),
80         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
81
82     // create a consumer in-place, without the need to build new class
83     template <typename TCallback>
84     static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
85         TCallback&& callback);
86
87     /**
88      * messageAvailable() will be invoked whenever a new
89      * message is available from the pipe.
90      */
91     virtual void messageAvailable(MessageT&& message) noexcept = 0;
92
93     /**
94      * Begin consuming messages from the specified queue.
95      *
96      * messageAvailable() will be called whenever a message is available.  This
97      * consumer will continue to consume messages until stopConsuming() is
98      * called.
99      *
100      * A Consumer may only consume messages from a single NotificationQueue at
101      * a time.  startConsuming() should not be called if this consumer is
102      * already consuming.
103      */
104     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
105       init(eventBase, queue);
106       registerHandler(READ | PERSIST);
107     }
108
109     /**
110      * Same as above but registers this event handler as internal so that it
111      * doesn't count towards the pending reader count for the IOLoop.
112      */
113     void startConsumingInternal(
114         EventBase* eventBase, NotificationQueue* queue) {
115       init(eventBase, queue);
116       registerInternalHandler(READ | PERSIST);
117     }
118
119     /**
120      * Stop consuming messages.
121      *
122      * startConsuming() may be called again to resume consumption of messages
123      * at a later point in time.
124      */
125     void stopConsuming();
126
127     /**
128      * Consume messages off the queue until it is empty. No messages may be
129      * added to the queue while it is draining, so that the process is bounded.
130      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
131      * and tryPutMessageNoThrow will return false.
132      *
133      * @returns true if the queue was drained, false otherwise. In practice,
134      * this will only fail if someone else is already draining the queue.
135      */
136     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
137
138     /**
139      * Get the NotificationQueue that this consumer is currently consuming
140      * messages from.  Returns nullptr if the consumer is not currently
141      * consuming events from any queue.
142      */
143     NotificationQueue* getCurrentQueue() const {
144       return queue_;
145     }
146
147     /**
148      * Set a limit on how many messages this consumer will read each iteration
149      * around the event loop.
150      *
151      * This helps rate-limit how much work the Consumer will do each event loop
152      * iteration, to prevent it from starving other event handlers.
153      *
154      * A limit of 0 means no limit will be enforced.  If unset, the limit
155      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
156      */
157     void setMaxReadAtOnce(uint32_t maxAtOnce) {
158       maxReadAtOnce_ = maxAtOnce;
159     }
160     uint32_t getMaxReadAtOnce() const {
161       return maxReadAtOnce_;
162     }
163
164     EventBase* getEventBase() {
165       return base_;
166     }
167
168     void handlerReady(uint16_t events) noexcept override;
169
170    protected:
171
172     void destroy() override;
173
174     virtual ~Consumer() {}
175
176    private:
177     /**
178      * Consume messages off the the queue until
179      *   - the queue is empty (1), or
180      *   - until the consumer is destroyed, or
181      *   - until the consumer is uninstalled, or
182      *   - an exception is thrown in the course of dequeueing, or
183      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
184      *
185      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
186      */
187     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
188
189     void setActive(bool active, bool shouldLock = false) {
190       if (!queue_) {
191         active_ = active;
192         return;
193       }
194       if (shouldLock) {
195         queue_->spinlock_.lock();
196       }
197       if (!active_ && active) {
198         ++queue_->numActiveConsumers_;
199       } else if (active_ && !active) {
200         --queue_->numActiveConsumers_;
201       }
202       active_ = active;
203       if (shouldLock) {
204         queue_->spinlock_.unlock();
205       }
206     }
207     void init(EventBase* eventBase, NotificationQueue* queue);
208
209     NotificationQueue* queue_;
210     bool* destroyedFlagPtr_;
211     uint32_t maxReadAtOnce_;
212     EventBase* base_;
213     bool active_{false};
214   };
215
216   class SimpleConsumer {
217    public:
218     explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
219       ++queue_.numConsumers_;
220     }
221
222     ~SimpleConsumer() {
223       --queue_.numConsumers_;
224     }
225
226     int getFd() const {
227       return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
228     }
229
230    private:
231     NotificationQueue& queue_;
232   };
233
234   enum class FdType {
235     PIPE,
236 #ifdef FOLLY_HAVE_EVENTFD
237     EVENTFD,
238 #endif
239   };
240
241   /**
242    * Create a new NotificationQueue.
243    *
244    * If the maxSize parameter is specified, this sets the maximum queue size
245    * that will be enforced by tryPutMessage().  (This size is advisory, and may
246    * be exceeded if producers explicitly use putMessage() instead of
247    * tryPutMessage().)
248    *
249    * The fdType parameter determines the type of file descriptor used
250    * internally to signal message availability.  The default (eventfd) is
251    * preferable for performance and because it won't fail when the queue gets
252    * too long.  It is not available on on older and non-linux kernels, however.
253    * In this case the code will fall back to using a pipe, the parameter is
254    * mostly for testing purposes.
255    */
256   explicit NotificationQueue(uint32_t maxSize = 0,
257 #ifdef FOLLY_HAVE_EVENTFD
258                              FdType fdType = FdType::EVENTFD)
259 #else
260                              FdType fdType = FdType::PIPE)
261 #endif
262       : eventfd_(-1),
263         pipeFds_{-1, -1},
264         advisoryMaxQueueSize_(maxSize),
265         pid_(pid_t(getpid())),
266         queue_() {
267
268     RequestContext::saveContext();
269
270 #ifdef FOLLY_HAVE_EVENTFD
271     if (fdType == FdType::EVENTFD) {
272       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
273       if (eventfd_ == -1) {
274         if (errno == ENOSYS || errno == EINVAL) {
275           // eventfd not availalble
276           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
277                      << errno << ", falling back to pipe mode (is your kernel "
278                      << "> 2.6.30?)";
279           fdType = FdType::PIPE;
280         } else {
281           // some other error
282           folly::throwSystemError("Failed to create eventfd for "
283                                   "NotificationQueue", errno);
284         }
285       }
286     }
287 #endif
288     if (fdType == FdType::PIPE) {
289       if (pipe(pipeFds_)) {
290         folly::throwSystemError("Failed to create pipe for NotificationQueue",
291                                 errno);
292       }
293       try {
294         // put both ends of the pipe into non-blocking mode
295         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
296           folly::throwSystemError("failed to put NotificationQueue pipe read "
297                                   "endpoint into non-blocking mode", errno);
298         }
299         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
300           folly::throwSystemError("failed to put NotificationQueue pipe write "
301                                   "endpoint into non-blocking mode", errno);
302         }
303       } catch (...) {
304         ::close(pipeFds_[0]);
305         ::close(pipeFds_[1]);
306         throw;
307       }
308     }
309   }
310
311   ~NotificationQueue() {
312     if (eventfd_ >= 0) {
313       ::close(eventfd_);
314       eventfd_ = -1;
315     }
316     if (pipeFds_[0] >= 0) {
317       ::close(pipeFds_[0]);
318       pipeFds_[0] = -1;
319     }
320     if (pipeFds_[1] >= 0) {
321       ::close(pipeFds_[1]);
322       pipeFds_[1] = -1;
323     }
324   }
325
326   /**
327    * Set the advisory maximum queue size.
328    *
329    * This maximum queue size affects calls to tryPutMessage().  Message
330    * producers can still use the putMessage() call to unconditionally put a
331    * message on the queue, ignoring the configured maximum queue size.  This
332    * can cause the queue size to exceed the configured maximum.
333    */
334   void setMaxQueueSize(uint32_t max) {
335     advisoryMaxQueueSize_ = max;
336   }
337
338   /**
339    * Attempt to put a message on the queue if the queue is not already full.
340    *
341    * If the queue is full, a std::overflow_error will be thrown.  The
342    * setMaxQueueSize() function controls the maximum queue size.
343    *
344    * If the queue is currently draining, an std::runtime_error will be thrown.
345    *
346    * This method may contend briefly on a spinlock if many threads are
347    * concurrently accessing the queue, but for all intents and purposes it will
348    * immediately place the message on the queue and return.
349    *
350    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
351    * may throw any other exception thrown by the MessageT move/copy
352    * constructor.
353    */
354   void tryPutMessage(MessageT&& message) {
355     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
356   }
357   void tryPutMessage(const MessageT& message) {
358     putMessageImpl(message, advisoryMaxQueueSize_);
359   }
360
361   /**
362    * No-throw versions of the above.  Instead returns true on success, false on
363    * failure.
364    *
365    * Only std::overflow_error (the common exception case) and std::runtime_error
366    * (which indicates that the queue is being drained) are prevented from being
367    * thrown. User code must still catch std::bad_alloc errors.
368    */
369   bool tryPutMessageNoThrow(MessageT&& message) {
370     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
371   }
372   bool tryPutMessageNoThrow(const MessageT& message) {
373     return putMessageImpl(message, advisoryMaxQueueSize_, false);
374   }
375
376   /**
377    * Unconditionally put a message on the queue.
378    *
379    * This method is like tryPutMessage(), but ignores the maximum queue size
380    * and always puts the message on the queue, even if the maximum queue size
381    * would be exceeded.
382    *
383    * putMessage() may throw
384    *   - std::bad_alloc if memory allocation fails, and may
385    *   - std::runtime_error if the queue is currently draining
386    *   - any other exception thrown by the MessageT move/copy constructor.
387    */
388   void putMessage(MessageT&& message) {
389     putMessageImpl(std::move(message), 0);
390   }
391   void putMessage(const MessageT& message) {
392     putMessageImpl(message, 0);
393   }
394
395   /**
396    * Put several messages on the queue.
397    */
398   template<typename InputIteratorT>
399   void putMessages(InputIteratorT first, InputIteratorT last) {
400     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
401       IterCategory;
402     putMessagesImpl(first, last, IterCategory());
403   }
404
405   /**
406    * Try to immediately pull a message off of the queue, without blocking.
407    *
408    * If a message is immediately available, the result parameter will be
409    * updated to contain the message contents and true will be returned.
410    *
411    * If no message is available, false will be returned and result will be left
412    * unmodified.
413    */
414   bool tryConsume(MessageT& result) {
415     SCOPE_EXIT { syncSignalAndQueue(); };
416
417     checkPid();
418
419     folly::SpinLockGuard g(spinlock_);
420
421     if (UNLIKELY(queue_.empty())) {
422       return false;
423     }
424
425     auto& data = queue_.front();
426     result = std::move(data.first);
427     RequestContext::setContext(std::move(data.second));
428
429     queue_.pop_front();
430
431     return true;
432   }
433
434   size_t size() const {
435     folly::SpinLockGuard g(spinlock_);
436     return queue_.size();
437   }
438
439   /**
440    * Check that the NotificationQueue is being used from the correct process.
441    *
442    * If you create a NotificationQueue in one process, then fork, and try to
443    * send messages to the queue from the child process, you're going to have a
444    * bad time.  Unfortunately users have (accidentally) run into this.
445    *
446    * Because we use an eventfd/pipe, the child process can actually signal the
447    * parent process that an event is ready.  However, it can't put anything on
448    * the parent's queue, so the parent wakes up and finds an empty queue.  This
449    * check ensures that we catch the problem in the misbehaving child process
450    * code, and crash before signalling the parent process.
451    */
452   void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
453
454  private:
455   // Forbidden copy constructor and assignment operator
456   NotificationQueue(NotificationQueue const &) = delete;
457   NotificationQueue& operator=(NotificationQueue const &) = delete;
458
459   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
460     DCHECK(0 == spinlock_.try_lock());
461     if (maxSize > 0 && queue_.size() >= maxSize) {
462       if (throws) {
463         throw std::overflow_error("unable to add message to NotificationQueue: "
464                                   "queue is full");
465       }
466       return false;
467     }
468     return true;
469   }
470
471   inline bool checkDraining(bool throws=true) {
472     if (UNLIKELY(draining_ && throws)) {
473       throw std::runtime_error("queue is draining, cannot add message");
474     }
475     return draining_;
476   }
477
478 #ifdef __ANDROID__
479   // TODO 10860938 Remove after figuring out crash
480   mutable std::atomic<int> eventBytes_{0};
481   mutable std::atomic<int> maxEventBytes_{0};
482 #endif
483
484   void ensureSignalLocked() const {
485     // semantics: empty fd == empty queue <=> !signal_
486     if (signal_) {
487       return;
488     }
489
490     ssize_t bytes_written = 0;
491     size_t bytes_expected = 0;
492
493     do {
494       if (eventfd_ >= 0) {
495         // eventfd(2) dictates that we must write a 64-bit integer
496         uint64_t signal = 1;
497         bytes_expected = sizeof(signal);
498         bytes_written = ::write(eventfd_, &signal, bytes_expected);
499       } else {
500         uint8_t signal = 1;
501         bytes_expected = sizeof(signal);
502         bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
503       }
504     } while (bytes_written == -1 && errno == EINTR);
505
506 #ifdef __ANDROID__
507     if (bytes_written > 0) {
508       eventBytes_ += bytes_written;
509       maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
510     }
511 #endif
512
513     if (bytes_written == ssize_t(bytes_expected)) {
514       signal_ = true;
515     } else {
516 #ifdef __ANDROID__
517       LOG(ERROR) << "NotificationQueue Write Error=" << errno
518                  << " bytesInPipe=" << eventBytes_
519                  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
520 #endif
521       folly::throwSystemError("failed to signal NotificationQueue after "
522                               "write", errno);
523     }
524   }
525
526   void drainSignalsLocked() {
527     ssize_t bytes_read = 0;
528     if (eventfd_ > 0) {
529       uint64_t message;
530       bytes_read = readNoInt(eventfd_, &message, sizeof(message));
531       CHECK(bytes_read != -1 || errno == EAGAIN);
532     } else {
533       // There should only be one byte in the pipe. To avoid potential leaks we still drain.
534       uint8_t message[32];
535       ssize_t result;
536       while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
537         bytes_read += result;
538       }
539       CHECK(result == -1 && errno == EAGAIN);
540       LOG_IF(ERROR, bytes_read > 1)
541         << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
542         << bytes_read << " bytes, expected <= 1";
543     }
544     LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
545       << "[NotificationQueue] Unexpected state while draining signals: signal_="
546       << signal_ << " bytes_read=" << bytes_read;
547
548     signal_ = false;
549
550 #ifdef __ANDROID__
551     if (bytes_read > 0) {
552       eventBytes_ -= bytes_read;
553     }
554 #endif
555   }
556
557   void ensureSignal() const {
558     folly::SpinLockGuard g(spinlock_);
559     ensureSignalLocked();
560   }
561
562   void syncSignalAndQueue() {
563     folly::SpinLockGuard g(spinlock_);
564
565     if (queue_.empty()) {
566       drainSignalsLocked();
567     } else {
568       ensureSignalLocked();
569     }
570   }
571
572   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
573     checkPid();
574     bool signal = false;
575     {
576       folly::SpinLockGuard g(spinlock_);
577       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
578         return false;
579       }
580       // We only need to signal an event if not all consumers are
581       // awake.
582       if (numActiveConsumers_ < numConsumers_) {
583         signal = true;
584       }
585       queue_.emplace_back(std::move(message), RequestContext::saveContext());
586       if (signal) {
587         ensureSignalLocked();
588       }
589     }
590     return true;
591   }
592
593   bool putMessageImpl(
594     const MessageT& message, size_t maxSize, bool throws=true) {
595     checkPid();
596     bool signal = false;
597     {
598       folly::SpinLockGuard g(spinlock_);
599       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
600         return false;
601       }
602       if (numActiveConsumers_ < numConsumers_) {
603         signal = true;
604       }
605       queue_.emplace_back(message, RequestContext::saveContext());
606       if (signal) {
607         ensureSignalLocked();
608       }
609     }
610     return true;
611   }
612
613   template<typename InputIteratorT>
614   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
615                        std::input_iterator_tag) {
616     checkPid();
617     bool signal = false;
618     size_t numAdded = 0;
619     {
620       folly::SpinLockGuard g(spinlock_);
621       checkDraining();
622       while (first != last) {
623         queue_.emplace_back(*first, RequestContext::saveContext());
624         ++first;
625         ++numAdded;
626       }
627       if (numActiveConsumers_ < numConsumers_) {
628         signal = true;
629       }
630       if (signal) {
631         ensureSignalLocked();
632       }
633     }
634   }
635
636   mutable folly::SpinLock spinlock_;
637   mutable bool signal_{false};
638   int eventfd_;
639   int pipeFds_[2]; // to fallback to on older/non-linux systems
640   uint32_t advisoryMaxQueueSize_;
641   pid_t pid_;
642   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
643   int numConsumers_{0};
644   std::atomic<int> numActiveConsumers_{0};
645   bool draining_{false};
646 };
647
648 template<typename MessageT>
649 void NotificationQueue<MessageT>::Consumer::destroy() {
650   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
651   // will be non-nullptr.  Mark the value that it points to, so that
652   // handlerReady() will know the callback is destroyed, and that it cannot
653   // access any member variables anymore.
654   if (destroyedFlagPtr_) {
655     *destroyedFlagPtr_ = true;
656   }
657   stopConsuming();
658   DelayedDestruction::destroy();
659 }
660
661 template<typename MessageT>
662 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
663     noexcept {
664   consumeMessages(false);
665 }
666
667 template<typename MessageT>
668 void NotificationQueue<MessageT>::Consumer::consumeMessages(
669     bool isDrain, size_t* numConsumed) noexcept {
670   DestructorGuard dg(this);
671   uint32_t numProcessed = 0;
672   setActive(true);
673   SCOPE_EXIT {
674     if (queue_) {
675       queue_->syncSignalAndQueue();
676     }
677   };
678   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
679   SCOPE_EXIT {
680     if (numConsumed != nullptr) {
681       *numConsumed = numProcessed;
682     }
683   };
684   while (true) {
685     // Now pop the message off of the queue.
686     //
687     // We have to manually acquire and release the spinlock here, rather than
688     // using SpinLockHolder since the MessageT has to be constructed while
689     // holding the spinlock and available after we release it.  SpinLockHolder
690     // unfortunately doesn't provide a release() method.  (We can't construct
691     // MessageT first since we have no guarantee that MessageT has a default
692     // constructor.
693     queue_->spinlock_.lock();
694     bool locked = true;
695
696     try {
697       if (UNLIKELY(queue_->queue_.empty())) {
698         // If there is no message, we've reached the end of the queue, return.
699         setActive(false);
700         queue_->spinlock_.unlock();
701         return;
702       }
703
704       // Pull a message off the queue.
705       auto& data = queue_->queue_.front();
706
707       MessageT msg(std::move(data.first));
708       RequestContextScopeGuard rctx(std::move(data.second));
709       queue_->queue_.pop_front();
710
711       // Check to see if the queue is empty now.
712       // We use this as an optimization to see if we should bother trying to
713       // loop again and read another message after invoking this callback.
714       bool wasEmpty = queue_->queue_.empty();
715       if (wasEmpty) {
716         setActive(false);
717       }
718
719       // Now unlock the spinlock before we invoke the callback.
720       queue_->spinlock_.unlock();
721       locked = false;
722
723       // Call the callback
724       bool callbackDestroyed = false;
725       CHECK(destroyedFlagPtr_ == nullptr);
726       destroyedFlagPtr_ = &callbackDestroyed;
727       messageAvailable(std::move(msg));
728       destroyedFlagPtr_ = nullptr;
729
730       // If the callback was destroyed before it returned, we are done
731       if (callbackDestroyed) {
732         return;
733       }
734
735       // If the callback is no longer installed, we are done.
736       if (queue_ == nullptr) {
737         return;
738       }
739
740       // If we have hit maxReadAtOnce_, we are done.
741       ++numProcessed;
742       if (!isDrain && maxReadAtOnce_ > 0 &&
743           numProcessed >= maxReadAtOnce_) {
744         return;
745       }
746
747       // If the queue was empty before we invoked the callback, it's probable
748       // that it is still empty now.  Just go ahead and return, rather than
749       // looping again and trying to re-read from the eventfd.  (If a new
750       // message had in fact arrived while we were invoking the callback, we
751       // will simply be woken up the next time around the event loop and will
752       // process the message then.)
753       if (wasEmpty) {
754         return;
755       }
756     } catch (const std::exception&) {
757       // This catch block is really just to handle the case where the MessageT
758       // constructor throws.  The messageAvailable() callback itself is
759       // declared as noexcept and should never throw.
760       //
761       // If the MessageT constructor does throw we try to handle it as best as
762       // we can, but we can't work miracles.  We will just ignore the error for
763       // now and return.  The next time around the event loop we will end up
764       // trying to read the message again.  If MessageT continues to throw we
765       // will never make forward progress and will keep trying each time around
766       // the event loop.
767       if (locked) {
768         // Unlock the spinlock.
769         queue_->spinlock_.unlock();
770       }
771
772       return;
773     }
774   }
775 }
776
777 template<typename MessageT>
778 void NotificationQueue<MessageT>::Consumer::init(
779     EventBase* eventBase,
780     NotificationQueue* queue) {
781   assert(eventBase->isInEventBaseThread());
782   assert(queue_ == nullptr);
783   assert(!isHandlerRegistered());
784   queue->checkPid();
785
786   base_ = eventBase;
787
788   queue_ = queue;
789
790   {
791     folly::SpinLockGuard g(queue_->spinlock_);
792     queue_->numConsumers_++;
793   }
794   queue_->ensureSignal();
795
796   if (queue_->eventfd_ >= 0) {
797     initHandler(eventBase, queue_->eventfd_);
798   } else {
799     initHandler(eventBase, queue_->pipeFds_[0]);
800   }
801 }
802
803 template<typename MessageT>
804 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
805   if (queue_ == nullptr) {
806     assert(!isHandlerRegistered());
807     return;
808   }
809
810   {
811     folly::SpinLockGuard g(queue_->spinlock_);
812     queue_->numConsumers_--;
813     setActive(false);
814   }
815
816   assert(isHandlerRegistered());
817   unregisterHandler();
818   detachEventBase();
819   queue_ = nullptr;
820 }
821
822 template<typename MessageT>
823 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
824     size_t* numConsumed) noexcept {
825   DestructorGuard dg(this);
826   {
827     folly::SpinLockGuard g(queue_->spinlock_);
828     if (queue_->draining_) {
829       return false;
830     }
831     queue_->draining_ = true;
832   }
833   consumeMessages(true, numConsumed);
834   {
835     folly::SpinLockGuard g(queue_->spinlock_);
836     queue_->draining_ = false;
837   }
838   return true;
839 }
840
841 /**
842  * Creates a NotificationQueue::Consumer wrapping a function object
843  * Modeled after AsyncTimeout::make
844  *
845  */
846
847 namespace detail {
848
849 template <typename MessageT, typename TCallback>
850 struct notification_queue_consumer_wrapper
851     : public NotificationQueue<MessageT>::Consumer {
852
853   template <typename UCallback>
854   explicit notification_queue_consumer_wrapper(UCallback&& callback)
855       : callback_(std::forward<UCallback>(callback)) {}
856
857   // we are being stricter here and requiring noexcept for callback
858   void messageAvailable(MessageT&& message) noexcept override {
859     static_assert(
860       noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
861       "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
862     );
863
864     callback_(std::forward<MessageT>(message));
865   }
866
867  private:
868   TCallback callback_;
869 };
870
871 } // namespace detail
872
873 template <typename MessageT>
874 template <typename TCallback>
875 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
876                 DelayedDestruction::Destructor>
877 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
878   return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
879                          DelayedDestruction::Destructor>(
880       new detail::notification_queue_consumer_wrapper<
881           MessageT,
882           typename std::decay<TCallback>::type>(
883           std::forward<TCallback>(callback)));
884 }
885
886 } // folly