Support NotificationQueue on iOS/Android
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include <folly/io/PortableSpinLock.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/io/async/Request.h>
26 #include <folly/Likely.h>
27 #include <folly/ScopeGuard.h>
28
29 #include <glog/logging.h>
30 #include <deque>
31
32 #if __linux__ && !__ANDROID__
33 #define FOLLY_HAVE_EVENTFD
34 #include <folly/io/async/EventFDWrapper.h>
35 #endif
36
37 namespace folly {
38
39 /**
40  * A producer-consumer queue for passing messages between EventBase threads.
41  *
42  * Messages can be added to the queue from any thread.  Multiple consumers may
43  * listen to the queue from multiple EventBase threads.
44  *
45  * A NotificationQueue may not be destroyed while there are still consumers
46  * registered to receive events from the queue.  It is the user's
47  * responsibility to ensure that all consumers are unregistered before the
48  * queue is destroyed.
49  *
50  * MessageT should be MoveConstructible (i.e., must support either a move
51  * constructor or a copy constructor, or both).  Ideally it's move constructor
52  * (or copy constructor if no move constructor is provided) should never throw
53  * exceptions.  If the constructor may throw, the consumers could end up
54  * spinning trying to move a message off the queue and failing, and then
55  * retrying.
56  */
57 template<typename MessageT>
58 class NotificationQueue {
59  public:
60   /**
61    * A callback interface for consuming messages from the queue as they arrive.
62    */
63   class Consumer : private EventHandler {
64    public:
65     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
66
67     Consumer()
68       : queue_(nullptr),
69         destroyedFlagPtr_(nullptr),
70         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
71
72     virtual ~Consumer();
73
74     /**
75      * messageAvailable() will be invoked whenever a new
76      * message is available from the pipe.
77      */
78     virtual void messageAvailable(MessageT&& message) = 0;
79
80     /**
81      * Begin consuming messages from the specified queue.
82      *
83      * messageAvailable() will be called whenever a message is available.  This
84      * consumer will continue to consume messages until stopConsuming() is
85      * called.
86      *
87      * A Consumer may only consume messages from a single NotificationQueue at
88      * a time.  startConsuming() should not be called if this consumer is
89      * already consuming.
90      */
91     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
92       init(eventBase, queue);
93       registerHandler(READ | PERSIST);
94     }
95
96     /**
97      * Same as above but registers this event handler as internal so that it
98      * doesn't count towards the pending reader count for the IOLoop.
99      */
100     void startConsumingInternal(
101         EventBase* eventBase, NotificationQueue* queue) {
102       init(eventBase, queue);
103       registerInternalHandler(READ | PERSIST);
104     }
105
106     /**
107      * Stop consuming messages.
108      *
109      * startConsuming() may be called again to resume consumption of messages
110      * at a later point in time.
111      */
112     void stopConsuming();
113
114     /**
115      * Get the NotificationQueue that this consumer is currently consuming
116      * messages from.  Returns nullptr if the consumer is not currently
117      * consuming events from any queue.
118      */
119     NotificationQueue* getCurrentQueue() const {
120       return queue_;
121     }
122
123     /**
124      * Set a limit on how many messages this consumer will read each iteration
125      * around the event loop.
126      *
127      * This helps rate-limit how much work the Consumer will do each event loop
128      * iteration, to prevent it from starving other event handlers.
129      *
130      * A limit of 0 means no limit will be enforced.  If unset, the limit
131      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
132      */
133     void setMaxReadAtOnce(uint32_t maxAtOnce) {
134       maxReadAtOnce_ = maxAtOnce;
135     }
136     uint32_t getMaxReadAtOnce() const {
137       return maxReadAtOnce_;
138     }
139
140     EventBase* getEventBase() {
141       return base_;
142     }
143
144     virtual void handlerReady(uint16_t events) noexcept;
145
146    private:
147
148     void setActive(bool active, bool shouldLock = false) {
149       if (!queue_) {
150         active_ = active;
151         return;
152       }
153       if (shouldLock) {
154         queue_->spinlock_.lock();
155       }
156       if (!active_ && active) {
157         ++queue_->numActiveConsumers_;
158       } else if (active_ && !active) {
159         --queue_->numActiveConsumers_;
160       }
161       active_ = active;
162       if (shouldLock) {
163         queue_->spinlock_.unlock();
164       }
165     }
166     void init(EventBase* eventBase, NotificationQueue* queue);
167
168     NotificationQueue* queue_;
169     bool* destroyedFlagPtr_;
170     uint32_t maxReadAtOnce_;
171     EventBase* base_;
172     bool active_{false};
173   };
174
175   enum class FdType {
176     PIPE,
177 #ifdef FOLLY_HAVE_EVENTFD
178     EVENTFD,
179 #endif
180   };
181
182   /**
183    * Create a new NotificationQueue.
184    *
185    * If the maxSize parameter is specified, this sets the maximum queue size
186    * that will be enforced by tryPutMessage().  (This size is advisory, and may
187    * be exceeded if producers explicitly use putMessage() instead of
188    * tryPutMessage().)
189    *
190    * The fdType parameter determines the type of file descriptor used
191    * internally to signal message availability.  The default (eventfd) is
192    * preferable for performance and because it won't fail when the queue gets
193    * too long.  It is not available on on older and non-linux kernels, however.
194    * In this case the code will fall back to using a pipe, the parameter is
195    * mostly for testing purposes.
196    */
197   explicit NotificationQueue(uint32_t maxSize = 0,
198 #ifdef FOLLY_HAVE_EVENTFD
199                              FdType fdType = FdType::EVENTFD)
200 #else
201                              FdType fdType = FdType::PIPE)
202 #endif
203     : eventfd_(-1),
204       pipeFds_{-1, -1},
205       advisoryMaxQueueSize_(maxSize),
206       pid_(getpid()),
207       queue_() {
208
209     RequestContext::getStaticContext();
210
211 #ifdef FOLLY_HAVE_EVENTFD
212     if (fdType == FdType::EVENTFD) {
213       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
214       if (eventfd_ == -1) {
215         if (errno == ENOSYS || errno == EINVAL) {
216           // eventfd not availalble
217           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
218                      << errno << ", falling back to pipe mode (is your kernel "
219                      << "> 2.6.30?)";
220           fdType = FdType::PIPE;
221         } else {
222           // some other error
223           folly::throwSystemError("Failed to create eventfd for "
224                                   "NotificationQueue", errno);
225         }
226       }
227     }
228 #endif
229     if (fdType == FdType::PIPE) {
230       if (pipe(pipeFds_)) {
231         folly::throwSystemError("Failed to create pipe for NotificationQueue",
232                                 errno);
233       }
234       try {
235         // put both ends of the pipe into non-blocking mode
236         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
237           folly::throwSystemError("failed to put NotificationQueue pipe read "
238                                   "endpoint into non-blocking mode", errno);
239         }
240         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
241           folly::throwSystemError("failed to put NotificationQueue pipe write "
242                                   "endpoint into non-blocking mode", errno);
243         }
244       } catch (...) {
245         ::close(pipeFds_[0]);
246         ::close(pipeFds_[1]);
247         throw;
248       }
249     }
250   }
251
252   ~NotificationQueue() {
253     if (eventfd_ >= 0) {
254       ::close(eventfd_);
255       eventfd_ = -1;
256     }
257     if (pipeFds_[0] >= 0) {
258       ::close(pipeFds_[0]);
259       pipeFds_[0] = -1;
260     }
261     if (pipeFds_[1] >= 0) {
262       ::close(pipeFds_[1]);
263       pipeFds_[1] = -1;
264     }
265   }
266
267   /**
268    * Set the advisory maximum queue size.
269    *
270    * This maximum queue size affects calls to tryPutMessage().  Message
271    * producers can still use the putMessage() call to unconditionally put a
272    * message on the queue, ignoring the configured maximum queue size.  This
273    * can cause the queue size to exceed the configured maximum.
274    */
275   void setMaxQueueSize(uint32_t max) {
276     advisoryMaxQueueSize_ = max;
277   }
278
279   /**
280    * Attempt to put a message on the queue if the queue is not already full.
281    *
282    * If the queue is full, a std::overflow_error will be thrown.  The
283    * setMaxQueueSize() function controls the maximum queue size.
284    *
285    * This method may contend briefly on a spinlock if many threads are
286    * concurrently accessing the queue, but for all intents and purposes it will
287    * immediately place the message on the queue and return.
288    *
289    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
290    * may throw any other exception thrown by the MessageT move/copy
291    * constructor.
292    */
293   void tryPutMessage(MessageT&& message) {
294     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
295   }
296   void tryPutMessage(const MessageT& message) {
297     putMessageImpl(message, advisoryMaxQueueSize_);
298   }
299
300   /**
301    * No-throw versions of the above.  Instead returns true on success, false on
302    * failure.
303    *
304    * Only std::overflow_error is prevented from being thrown (since this is the
305    * common exception case), user code must still catch std::bad_alloc errors.
306    */
307   bool tryPutMessageNoThrow(MessageT&& message) {
308     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
309   }
310   bool tryPutMessageNoThrow(const MessageT& message) {
311     return putMessageImpl(message, advisoryMaxQueueSize_, false);
312   }
313
314   /**
315    * Unconditionally put a message on the queue.
316    *
317    * This method is like tryPutMessage(), but ignores the maximum queue size
318    * and always puts the message on the queue, even if the maximum queue size
319    * would be exceeded.
320    *
321    * putMessage() may throw std::bad_alloc if memory allocation fails, and may
322    * throw any other exception thrown by the MessageT move/copy constructor.
323    */
324   void putMessage(MessageT&& message) {
325     putMessageImpl(std::move(message), 0);
326   }
327   void putMessage(const MessageT& message) {
328     putMessageImpl(message, 0);
329   }
330
331   /**
332    * Put several messages on the queue.
333    */
334   template<typename InputIteratorT>
335   void putMessages(InputIteratorT first, InputIteratorT last) {
336     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
337       IterCategory;
338     putMessagesImpl(first, last, IterCategory());
339   }
340
341   /**
342    * Try to immediately pull a message off of the queue, without blocking.
343    *
344    * If a message is immediately available, the result parameter will be
345    * updated to contain the message contents and true will be returned.
346    *
347    * If no message is available, false will be returned and result will be left
348    * unmodified.
349    */
350   bool tryConsume(MessageT& result) {
351     checkPid();
352
353     try {
354
355       folly::io::PortableSpinLockGuard g(spinlock_);
356
357       if (UNLIKELY(queue_.empty())) {
358         return false;
359       }
360
361       auto data = std::move(queue_.front());
362       result = data.first;
363       RequestContext::setContext(data.second);
364
365       queue_.pop_front();
366     } catch (...) {
367       // Handle an exception if the assignment operator happens to throw.
368       // We consumed an event but weren't able to pop the message off the
369       // queue.  Signal the event again since the message is still in the
370       // queue.
371       signalEvent(1);
372       throw;
373     }
374
375     return true;
376   }
377
378   int size() {
379     folly::io::PortableSpinLockGuard g(spinlock_);
380     return queue_.size();
381   }
382
383   /**
384    * Check that the NotificationQueue is being used from the correct process.
385    *
386    * If you create a NotificationQueue in one process, then fork, and try to
387    * send messages to the queue from the child process, you're going to have a
388    * bad time.  Unfortunately users have (accidentally) run into this.
389    *
390    * Because we use an eventfd/pipe, the child process can actually signal the
391    * parent process that an event is ready.  However, it can't put anything on
392    * the parent's queue, so the parent wakes up and finds an empty queue.  This
393    * check ensures that we catch the problem in the misbehaving child process
394    * code, and crash before signalling the parent process.
395    */
396   void checkPid() const {
397     CHECK_EQ(pid_, getpid());
398   }
399
400  private:
401   // Forbidden copy constructor and assignment operator
402   NotificationQueue(NotificationQueue const &) = delete;
403   NotificationQueue& operator=(NotificationQueue const &) = delete;
404
405   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
406     DCHECK(0 == spinlock_.trylock());
407     if (maxSize > 0 && queue_.size() >= maxSize) {
408       if (throws) {
409         throw std::overflow_error("unable to add message to NotificationQueue: "
410                                   "queue is full");
411       }
412       return false;
413     }
414     return true;
415   }
416
417   inline void signalEvent(size_t numAdded = 1) const {
418     static const uint8_t kPipeMessage[] = {
419       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
420     };
421
422     ssize_t bytes_written = 0;
423     ssize_t bytes_expected = 0;
424     if (eventfd_ >= 0) {
425       // eventfd(2) dictates that we must write a 64-bit integer
426       uint64_t numAdded64(numAdded);
427       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
428       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
429     } else {
430       // pipe semantics, add one message for each numAdded
431       bytes_expected = numAdded;
432       do {
433         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
434         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
435         if (rc < 0) {
436           // TODO: if the pipe is full, write will fail with EAGAIN.
437           // See task #1044651 for how this could be handled
438           break;
439         }
440         numAdded -= rc;
441         bytes_written += rc;
442       } while (numAdded > 0);
443     }
444     if (bytes_written != bytes_expected) {
445       folly::throwSystemError("failed to signal NotificationQueue after "
446                               "write", errno);
447     }
448   }
449
450   bool tryConsumeEvent() {
451     uint64_t value = 0;
452     ssize_t rc = -1;
453     if (eventfd_ >= 0) {
454       rc = ::read(eventfd_, &value, sizeof(value));
455     } else {
456       uint8_t value8;
457       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
458       value = value8;
459     }
460     if (rc < 0) {
461       // EAGAIN should pretty much be the only error we can ever get.
462       // This means someone else already processed the only available message.
463       assert(errno == EAGAIN);
464       return false;
465     }
466     assert(value == 1);
467     return true;
468   }
469
470   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
471     checkPid();
472     bool signal = false;
473     {
474       folly::io::PortableSpinLockGuard g(spinlock_);
475       if (!checkQueueSize(maxSize, throws)) {
476         return false;
477       }
478       // We only need to signal an event if not all consumers are
479       // awake.
480       if (numActiveConsumers_ < numConsumers_) {
481         signal = true;
482       }
483       queue_.push_back(
484         std::make_pair(std::move(message),
485                        RequestContext::saveContext()));
486     }
487     if (signal) {
488       signalEvent();
489     }
490     return true;
491   }
492
493   bool putMessageImpl(
494     const MessageT& message, size_t maxSize, bool throws=true) {
495     checkPid();
496     bool signal = false;
497     {
498       folly::io::PortableSpinLockGuard g(spinlock_);
499       if (!checkQueueSize(maxSize, throws)) {
500         return false;
501       }
502       if (numActiveConsumers_ < numConsumers_) {
503         signal = true;
504       }
505       queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
506     }
507     if (signal) {
508       signalEvent();
509     }
510     return true;
511   }
512
513   template<typename InputIteratorT>
514   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
515                        std::input_iterator_tag) {
516     checkPid();
517     bool signal = false;
518     size_t numAdded = 0;
519     {
520       folly::io::PortableSpinLockGuard g(spinlock_);
521       while (first != last) {
522         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
523         ++first;
524         ++numAdded;
525       }
526       if (numActiveConsumers_ < numConsumers_) {
527         signal = true;
528       }
529     }
530     if (signal) {
531       signalEvent();
532     }
533   }
534
535   mutable folly::io::PortableSpinLock spinlock_;
536   int eventfd_;
537   int pipeFds_[2]; // to fallback to on older/non-linux systems
538   uint32_t advisoryMaxQueueSize_;
539   pid_t pid_;
540   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
541   int numConsumers_{0};
542   std::atomic<int> numActiveConsumers_{0};
543 };
544
545 template<typename MessageT>
546 NotificationQueue<MessageT>::Consumer::~Consumer() {
547   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
548   // will be non-nullptr.  Mark the value that it points to, so that
549   // handlerReady() will know the callback is destroyed, and that it cannot
550   // access any member variables anymore.
551   if (destroyedFlagPtr_) {
552     *destroyedFlagPtr_ = true;
553   }
554 }
555
556 template<typename MessageT>
557 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
558     noexcept {
559   uint32_t numProcessed = 0;
560   bool firstRun = true;
561   setActive(true);
562   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
563   while (true) {
564     // Try to decrement the eventfd.
565     //
566     // The eventfd is only used to wake up the consumer - there may or
567     // may not actually be an event available (another consumer may
568     // have read it).  We don't really care, we only care about
569     // emptying the queue.
570     if (firstRun) {
571       queue_->tryConsumeEvent();
572       firstRun = false;
573     }
574
575     // Now pop the message off of the queue.
576     //
577     // We have to manually acquire and release the spinlock here, rather than
578     // using SpinLockHolder since the MessageT has to be constructed while
579     // holding the spinlock and available after we release it.  SpinLockHolder
580     // unfortunately doesn't provide a release() method.  (We can't construct
581     // MessageT first since we have no guarantee that MessageT has a default
582     // constructor.
583     queue_->spinlock_.lock();
584     bool locked = true;
585
586     try {
587       if (UNLIKELY(queue_->queue_.empty())) {
588         // If there is no message, we've reached the end of the queue, return.
589         setActive(false);
590         queue_->spinlock_.unlock();
591         return;
592       }
593
594       // Pull a message off the queue.
595       auto& data = queue_->queue_.front();
596
597       MessageT msg(std::move(data.first));
598       auto old_ctx =
599         RequestContext::setContext(data.second);
600       queue_->queue_.pop_front();
601
602       // Check to see if the queue is empty now.
603       // We use this as an optimization to see if we should bother trying to
604       // loop again and read another message after invoking this callback.
605       bool wasEmpty = queue_->queue_.empty();
606       if (wasEmpty) {
607         setActive(false);
608       }
609
610       // Now unlock the spinlock before we invoke the callback.
611       queue_->spinlock_.unlock();
612       locked = false;
613
614       // Call the callback
615       bool callbackDestroyed = false;
616       CHECK(destroyedFlagPtr_ == nullptr);
617       destroyedFlagPtr_ = &callbackDestroyed;
618       messageAvailable(std::move(msg));
619
620       RequestContext::setContext(old_ctx);
621
622       // If the callback was destroyed before it returned, we are done
623       if (callbackDestroyed) {
624         return;
625       }
626       destroyedFlagPtr_ = nullptr;
627
628       // If the callback is no longer installed, we are done.
629       if (queue_ == nullptr) {
630         return;
631       }
632
633       // If we have hit maxReadAtOnce_, we are done.
634       ++numProcessed;
635       if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
636         queue_->signalEvent(1);
637         return;
638       }
639
640       // If the queue was empty before we invoked the callback, it's probable
641       // that it is still empty now.  Just go ahead and return, rather than
642       // looping again and trying to re-read from the eventfd.  (If a new
643       // message had in fact arrived while we were invoking the callback, we
644       // will simply be woken up the next time around the event loop and will
645       // process the message then.)
646       if (wasEmpty) {
647         return;
648       }
649     } catch (const std::exception& ex) {
650       // This catch block is really just to handle the case where the MessageT
651       // constructor throws.  The messageAvailable() callback itself is
652       // declared as noexcept and should never throw.
653       //
654       // If the MessageT constructor does throw we try to handle it as best as
655       // we can, but we can't work miracles.  We will just ignore the error for
656       // now and return.  The next time around the event loop we will end up
657       // trying to read the message again.  If MessageT continues to throw we
658       // will never make forward progress and will keep trying each time around
659       // the event loop.
660       if (locked) {
661         // Unlock the spinlock.
662         queue_->spinlock_.unlock();
663
664         // Push a notification back on the eventfd since we didn't actually
665         // read the message off of the queue.
666         queue_->signalEvent(1);
667       }
668
669       return;
670     }
671   }
672 }
673
674 template<typename MessageT>
675 void NotificationQueue<MessageT>::Consumer::init(
676     EventBase* eventBase,
677     NotificationQueue* queue) {
678   assert(eventBase->isInEventBaseThread());
679   assert(queue_ == nullptr);
680   assert(!isHandlerRegistered());
681   queue->checkPid();
682
683   base_ = eventBase;
684
685   queue_ = queue;
686
687   {
688     folly::io::PortableSpinLockGuard g(queue_->spinlock_);
689     queue_->numConsumers_++;
690   }
691   queue_->signalEvent();
692
693   if (queue_->eventfd_ >= 0) {
694     initHandler(eventBase, queue_->eventfd_);
695   } else {
696     initHandler(eventBase, queue_->pipeFds_[0]);
697   }
698 }
699
700 template<typename MessageT>
701 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
702   if (queue_ == nullptr) {
703     assert(!isHandlerRegistered());
704     return;
705   }
706
707   {
708     folly::io::PortableSpinLockGuard g(queue_->spinlock_);
709     queue_->numConsumers_--;
710     setActive(false);
711   }
712
713   assert(isHandlerRegistered());
714   unregisterHandler();
715   detachEventBase();
716   queue_ = nullptr;
717 }
718
719 } // folly