d4996a07cd30a0cbd6ac6a0735e5d9e3aea6f0d6
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #pragma once
20
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 #include "folly/io/async/EventBase.h"
25 #include "folly/io/async/EventFDWrapper.h"
26 #include "folly/io/async/EventHandler.h"
27 #include "folly/io/async/Request.h"
28 #include "folly/Likely.h"
29 #include "folly/SmallLocks.h"
30 #include "folly/ScopeGuard.h"
31
32 #include <glog/logging.h>
33 #include <deque>
34
35 namespace folly {
36
37 /**
38  * A producer-consumer queue for passing messages between EventBase threads.
39  *
40  * Messages can be added to the queue from any thread.  Multiple consumers may
41  * listen to the queue from multiple EventBase threads.
42  *
43  * A NotificationQueue may not be destroyed while there are still consumers
44  * registered to receive events from the queue.  It is the user's
45  * responsibility to ensure that all consumers are unregistered before the
46  * queue is destroyed.
47  *
48  * MessageT should be MoveConstructible (i.e., must support either a move
49  * constructor or a copy constructor, or both).  Ideally it's move constructor
50  * (or copy constructor if no move constructor is provided) should never throw
51  * exceptions.  If the constructor may throw, the consumers could end up
52  * spinning trying to move a message off the queue and failing, and then
53  * retrying.
54  */
55 template<typename MessageT>
56 class NotificationQueue {
57  public:
58   /**
59    * A callback interface for consuming messages from the queue as they arrive.
60    */
61   class Consumer : private EventHandler {
62    public:
63     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
64
65     Consumer()
66       : queue_(nullptr),
67         destroyedFlagPtr_(nullptr),
68         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
69
70     virtual ~Consumer();
71
72     /**
73      * messageAvailable() will be invoked whenever a new
74      * message is available from the pipe.
75      */
76     virtual void messageAvailable(MessageT&& message) = 0;
77
78     /**
79      * Begin consuming messages from the specified queue.
80      *
81      * messageAvailable() will be called whenever a message is available.  This
82      * consumer will continue to consume messages until stopConsuming() is
83      * called.
84      *
85      * A Consumer may only consume messages from a single NotificationQueue at
86      * a time.  startConsuming() should not be called if this consumer is
87      * already consuming.
88      */
89     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
90       init(eventBase, queue);
91       registerHandler(READ | PERSIST);
92     }
93
94     /**
95      * Same as above but registers this event handler as internal so that it
96      * doesn't count towards the pending reader count for the IOLoop.
97      */
98     void startConsumingInternal(
99         EventBase* eventBase, NotificationQueue* queue) {
100       init(eventBase, queue);
101       registerInternalHandler(READ | PERSIST);
102     }
103
104     /**
105      * Stop consuming messages.
106      *
107      * startConsuming() may be called again to resume consumption of messages
108      * at a later point in time.
109      */
110     void stopConsuming();
111
112     /**
113      * Get the NotificationQueue that this consumer is currently consuming
114      * messages from.  Returns nullptr if the consumer is not currently
115      * consuming events from any queue.
116      */
117     NotificationQueue* getCurrentQueue() const {
118       return queue_;
119     }
120
121     /**
122      * Set a limit on how many messages this consumer will read each iteration
123      * around the event loop.
124      *
125      * This helps rate-limit how much work the Consumer will do each event loop
126      * iteration, to prevent it from starving other event handlers.
127      *
128      * A limit of 0 means no limit will be enforced.  If unset, the limit
129      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
130      */
131     void setMaxReadAtOnce(uint32_t maxAtOnce) {
132       maxReadAtOnce_ = maxAtOnce;
133     }
134     uint32_t getMaxReadAtOnce() const {
135       return maxReadAtOnce_;
136     }
137
138     EventBase* getEventBase() {
139       return base_;
140     }
141
142     virtual void handlerReady(uint16_t events) noexcept;
143
144    private:
145
146     void setActive(bool active) {
147       DCHECK(queue_);
148       if (!active_ && active) {
149         ++queue_->numActiveConsumers_;
150       } else if (active_ && !active) {
151         --queue_->numActiveConsumers_;
152       }
153       active_ = active;
154     }
155     void init(EventBase* eventBase, NotificationQueue* queue);
156
157     NotificationQueue* queue_;
158     bool* destroyedFlagPtr_;
159     uint32_t maxReadAtOnce_;
160     EventBase* base_;
161     bool active_{false};
162   };
163
164   enum class FdType {
165     EVENTFD,
166     PIPE
167   };
168
169   /**
170    * Create a new NotificationQueue.
171    *
172    * If the maxSize parameter is specified, this sets the maximum queue size
173    * that will be enforced by tryPutMessage().  (This size is advisory, and may
174    * be exceeded if producers explicitly use putMessage() instead of
175    * tryPutMessage().)
176    *
177    * The fdType parameter determines the type of file descriptor used
178    * internally to signal message availability.  The default (eventfd) is
179    * preferable for performance and because it won't fail when the queue gets
180    * too long.  It is not available on on older and non-linux kernels, however.
181    * In this case the code will fall back to using a pipe, the parameter is
182    * mostly for testing purposes.
183    */
184   explicit NotificationQueue(uint32_t maxSize = 0,
185                               FdType fdType = FdType::EVENTFD)
186     : eventfd_(-1),
187       pipeFds_{-1, -1},
188       advisoryMaxQueueSize_(maxSize),
189       pid_(getpid()),
190       queue_() {
191
192     spinlock_.init();
193
194     RequestContext::getStaticContext();
195
196     if (fdType == FdType::EVENTFD) {
197       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
198       if (eventfd_ == -1) {
199         if (errno == ENOSYS || errno == EINVAL) {
200           // eventfd not availalble
201           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
202                      << errno << ", falling back to pipe mode (is your kernel "
203                      << "> 2.6.30?)";
204           fdType = FdType::PIPE;
205         } else {
206           // some other error
207           folly::throwSystemError("Failed to create eventfd for "
208                                   "NotificationQueue", errno);
209         }
210       }
211     }
212     if (fdType == FdType::PIPE) {
213       if (pipe(pipeFds_)) {
214         folly::throwSystemError("Failed to create pipe for NotificationQueue",
215                                 errno);
216       }
217       try {
218         // put both ends of the pipe into non-blocking mode
219         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
220           folly::throwSystemError("failed to put NotificationQueue pipe read "
221                                   "endpoint into non-blocking mode", errno);
222         }
223         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
224           folly::throwSystemError("failed to put NotificationQueue pipe write "
225                                   "endpoint into non-blocking mode", errno);
226         }
227       } catch (...) {
228         ::close(pipeFds_[0]);
229         ::close(pipeFds_[1]);
230         throw;
231       }
232     }
233   }
234
235   ~NotificationQueue() {
236     if (eventfd_ >= 0) {
237       ::close(eventfd_);
238       eventfd_ = -1;
239     }
240     if (pipeFds_[0] >= 0) {
241       ::close(pipeFds_[0]);
242       pipeFds_[0] = -1;
243     }
244     if (pipeFds_[1] >= 0) {
245       ::close(pipeFds_[1]);
246       pipeFds_[1] = -1;
247     }
248   }
249
250   /**
251    * Set the advisory maximum queue size.
252    *
253    * This maximum queue size affects calls to tryPutMessage().  Message
254    * producers can still use the putMessage() call to unconditionally put a
255    * message on the queue, ignoring the configured maximum queue size.  This
256    * can cause the queue size to exceed the configured maximum.
257    */
258   void setMaxQueueSize(uint32_t max) {
259     advisoryMaxQueueSize_ = max;
260   }
261
262   /**
263    * Attempt to put a message on the queue if the queue is not already full.
264    *
265    * If the queue is full, a std::overflow_error will be thrown.  The
266    * setMaxQueueSize() function controls the maximum queue size.
267    *
268    * This method may contend briefly on a spinlock if many threads are
269    * concurrently accessing the queue, but for all intents and purposes it will
270    * immediately place the message on the queue and return.
271    *
272    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
273    * may throw any other exception thrown by the MessageT move/copy
274    * constructor.
275    */
276   void tryPutMessage(MessageT&& message) {
277     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
278   }
279   void tryPutMessage(const MessageT& message) {
280     putMessageImpl(message, advisoryMaxQueueSize_);
281   }
282
283   /**
284    * No-throw versions of the above.  Instead returns true on success, false on
285    * failure.
286    *
287    * Only std::overflow_error is prevented from being thrown (since this is the
288    * common exception case), user code must still catch std::bad_alloc errors.
289    */
290   bool tryPutMessageNoThrow(MessageT&& message) {
291     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
292   }
293   bool tryPutMessageNoThrow(const MessageT& message) {
294     return putMessageImpl(message, advisoryMaxQueueSize_, false);
295   }
296
297   /**
298    * Unconditionally put a message on the queue.
299    *
300    * This method is like tryPutMessage(), but ignores the maximum queue size
301    * and always puts the message on the queue, even if the maximum queue size
302    * would be exceeded.
303    *
304    * putMessage() may throw std::bad_alloc if memory allocation fails, and may
305    * throw any other exception thrown by the MessageT move/copy constructor.
306    */
307   void putMessage(MessageT&& message) {
308     putMessageImpl(std::move(message), 0);
309   }
310   void putMessage(const MessageT& message) {
311     putMessageImpl(message, 0);
312   }
313
314   /**
315    * Put several messages on the queue.
316    */
317   template<typename InputIteratorT>
318   void putMessages(InputIteratorT first, InputIteratorT last) {
319     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
320       IterCategory;
321     putMessagesImpl(first, last, IterCategory());
322   }
323
324   /**
325    * Try to immediately pull a message off of the queue, without blocking.
326    *
327    * If a message is immediately available, the result parameter will be
328    * updated to contain the message contents and true will be returned.
329    *
330    * If no message is available, false will be returned and result will be left
331    * unmodified.
332    */
333   bool tryConsume(MessageT& result) {
334     checkPid();
335
336     try {
337
338       folly::MSLGuard g(spinlock_);
339
340       if (UNLIKELY(queue_.empty())) {
341         return false;
342       }
343
344       auto data = std::move(queue_.front());
345       result = data.first;
346       RequestContext::setContext(data.second);
347
348       queue_.pop_front();
349     } catch (...) {
350       // Handle an exception if the assignment operator happens to throw.
351       // We consumed an event but weren't able to pop the message off the
352       // queue.  Signal the event again since the message is still in the
353       // queue.
354       signalEvent(1);
355       throw;
356     }
357
358     return true;
359   }
360
361   int size() {
362     folly::MSLGuard g(spinlock_);
363     return queue_.size();
364   }
365
366   /**
367    * Check that the NotificationQueue is being used from the correct process.
368    *
369    * If you create a NotificationQueue in one process, then fork, and try to
370    * send messages to the queue from the child process, you're going to have a
371    * bad time.  Unfortunately users have (accidentally) run into this.
372    *
373    * Because we use an eventfd/pipe, the child process can actually signal the
374    * parent process that an event is ready.  However, it can't put anything on
375    * the parent's queue, so the parent wakes up and finds an empty queue.  This
376    * check ensures that we catch the problem in the misbehaving child process
377    * code, and crash before signalling the parent process.
378    */
379   void checkPid() const {
380     CHECK_EQ(pid_, getpid());
381   }
382
383  private:
384   // Forbidden copy constructor and assignment operator
385   NotificationQueue(NotificationQueue const &) = delete;
386   NotificationQueue& operator=(NotificationQueue const &) = delete;
387
388   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
389     DCHECK(0 == spinlock_.try_lock());
390     if (maxSize > 0 && queue_.size() >= maxSize) {
391       if (throws) {
392         throw std::overflow_error("unable to add message to NotificationQueue: "
393                                   "queue is full");
394       }
395       return false;
396     }
397     return true;
398   }
399
400   inline void signalEvent(size_t numAdded = 1) const {
401     static const uint8_t kPipeMessage[] = {
402       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
403     };
404
405     ssize_t bytes_written = 0;
406     ssize_t bytes_expected = 0;
407     if (eventfd_ >= 0) {
408       // eventfd(2) dictates that we must write a 64-bit integer
409       uint64_t numAdded64(numAdded);
410       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
411       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
412     } else {
413       // pipe semantics, add one message for each numAdded
414       bytes_expected = numAdded;
415       do {
416         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
417         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
418         if (rc < 0) {
419           // TODO: if the pipe is full, write will fail with EAGAIN.
420           // See task #1044651 for how this could be handled
421           break;
422         }
423         numAdded -= rc;
424         bytes_written += rc;
425       } while (numAdded > 0);
426     }
427     if (bytes_written != bytes_expected) {
428       folly::throwSystemError("failed to signal NotificationQueue after "
429                               "write", errno);
430     }
431   }
432
433   bool tryConsumeEvent() {
434     uint64_t value = 0;
435     ssize_t rc = -1;
436     if (eventfd_ >= 0) {
437       rc = ::read(eventfd_, &value, sizeof(value));
438     } else {
439       uint8_t value8;
440       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
441       value = value8;
442     }
443     if (rc < 0) {
444       // EAGAIN should pretty much be the only error we can ever get.
445       // This means someone else already processed the only available message.
446       assert(errno == EAGAIN);
447       return false;
448     }
449     assert(value == 1);
450     return true;
451   }
452
453   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
454     checkPid();
455     bool signal = false;
456     {
457       folly::MSLGuard g(spinlock_);
458       if (!checkQueueSize(maxSize, throws)) {
459         return false;
460       }
461       // We only need to signal an event if not all consumers are
462       // awake.
463       if (numActiveConsumers_ < numConsumers_) {
464         signal = true;
465       }
466       queue_.push_back(
467         std::make_pair(std::move(message),
468                        RequestContext::saveContext()));
469     }
470     if (signal) {
471       signalEvent();
472     }
473     return true;
474   }
475
476   bool putMessageImpl(
477     const MessageT& message, size_t maxSize, bool throws=true) {
478     checkPid();
479     bool signal = false;
480     {
481       folly::MSLGuard g(spinlock_);
482       if (!checkQueueSize(maxSize, throws)) {
483         return false;
484       }
485       if (numActiveConsumers_ < numConsumers_) {
486         signal = true;
487       }
488       queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
489     }
490     if (signal) {
491       signalEvent();
492     }
493     return true;
494   }
495
496   template<typename InputIteratorT>
497   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
498                        std::input_iterator_tag) {
499     checkPid();
500     bool signal = false;
501     size_t numAdded = 0;
502     {
503       folly::MSLGuard g(spinlock_);
504       while (first != last) {
505         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
506         ++first;
507         ++numAdded;
508       }
509       if (numActiveConsumers_ < numConsumers_) {
510         signal = true;
511       }
512     }
513     if (signal) {
514       signalEvent();
515     }
516   }
517
518   mutable folly::MicroSpinLock spinlock_;
519   int eventfd_;
520   int pipeFds_[2]; // to fallback to on older/non-linux systems
521   uint32_t advisoryMaxQueueSize_;
522   pid_t pid_;
523   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
524   int numConsumers_{0};
525   std::atomic<int> numActiveConsumers_{0};
526 };
527
528 template<typename MessageT>
529 NotificationQueue<MessageT>::Consumer::~Consumer() {
530   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
531   // will be non-nullptr.  Mark the value that it points to, so that
532   // handlerReady() will know the callback is destroyed, and that it cannot
533   // access any member variables anymore.
534   if (destroyedFlagPtr_) {
535     *destroyedFlagPtr_ = true;
536   }
537 }
538
539 template<typename MessageT>
540 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
541     noexcept {
542   uint32_t numProcessed = 0;
543   bool firstRun = true;
544   setActive(true);
545   SCOPE_EXIT { setActive(false); };
546   while (true) {
547     // Try to decrement the eventfd.
548     //
549     // The eventfd is only used to wake up the consumer - there may or
550     // may not actually be an event available (another consumer may
551     // have read it).  We don't really care, we only care about
552     // emptying the queue.
553     if (firstRun) {
554       queue_->tryConsumeEvent();
555       firstRun = false;
556     }
557
558     // Now pop the message off of the queue.
559     //
560     // We have to manually acquire and release the spinlock here, rather than
561     // using SpinLockHolder since the MessageT has to be constructed while
562     // holding the spinlock and available after we release it.  SpinLockHolder
563     // unfortunately doesn't provide a release() method.  (We can't construct
564     // MessageT first since we have no guarantee that MessageT has a default
565     // constructor.
566     queue_->spinlock_.lock();
567     bool locked = true;
568
569     try {
570       if (UNLIKELY(queue_->queue_.empty())) {
571         // If there is no message, we've reached the end of the queue, return.
572         queue_->spinlock_.unlock();
573         return;
574       }
575
576       // Pull a message off the queue.
577       auto& data = queue_->queue_.front();
578
579       MessageT msg(std::move(data.first));
580       auto old_ctx =
581         RequestContext::setContext(data.second);
582       queue_->queue_.pop_front();
583
584       // Check to see if the queue is empty now.
585       // We use this as an optimization to see if we should bother trying to
586       // loop again and read another message after invoking this callback.
587       bool wasEmpty = queue_->queue_.empty();
588       if (wasEmpty) {
589         setActive(false);
590       }
591
592       // Now unlock the spinlock before we invoke the callback.
593       queue_->spinlock_.unlock();
594       locked = false;
595
596       // Call the callback
597       bool callbackDestroyed = false;
598       CHECK(destroyedFlagPtr_ == nullptr);
599       destroyedFlagPtr_ = &callbackDestroyed;
600       messageAvailable(std::move(msg));
601
602       RequestContext::setContext(old_ctx);
603
604       // If the callback was destroyed before it returned, we are done
605       if (callbackDestroyed) {
606         return;
607       }
608       destroyedFlagPtr_ = nullptr;
609
610       // If the callback is no longer installed, we are done.
611       if (queue_ == nullptr) {
612         return;
613       }
614
615       // If we have hit maxReadAtOnce_, we are done.
616       ++numProcessed;
617       if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
618         queue_->signalEvent(1);
619         return;
620       }
621
622       // If the queue was empty before we invoked the callback, it's probable
623       // that it is still empty now.  Just go ahead and return, rather than
624       // looping again and trying to re-read from the eventfd.  (If a new
625       // message had in fact arrived while we were invoking the callback, we
626       // will simply be woken up the next time around the event loop and will
627       // process the message then.)
628       if (wasEmpty) {
629         return;
630       }
631     } catch (const std::exception& ex) {
632       // This catch block is really just to handle the case where the MessageT
633       // constructor throws.  The messageAvailable() callback itself is
634       // declared as noexcept and should never throw.
635       //
636       // If the MessageT constructor does throw we try to handle it as best as
637       // we can, but we can't work miracles.  We will just ignore the error for
638       // now and return.  The next time around the event loop we will end up
639       // trying to read the message again.  If MessageT continues to throw we
640       // will never make forward progress and will keep trying each time around
641       // the event loop.
642       if (locked) {
643         // Unlock the spinlock.
644         queue_->spinlock_.unlock();
645
646         // Push a notification back on the eventfd since we didn't actually
647         // read the message off of the queue.
648         queue_->signalEvent(1);
649       }
650
651       return;
652     }
653   }
654 }
655
656 template<typename MessageT>
657 void NotificationQueue<MessageT>::Consumer::init(
658     EventBase* eventBase,
659     NotificationQueue* queue) {
660   assert(eventBase->isInEventBaseThread());
661   assert(queue_ == nullptr);
662   assert(!isHandlerRegistered());
663   queue->checkPid();
664
665   base_ = eventBase;
666
667   queue_ = queue;
668
669   {
670     folly::MSLGuard g(queue_->spinlock_);
671     queue_->numConsumers_++;
672   }
673   queue_->signalEvent();
674
675   if (queue_->eventfd_ >= 0) {
676     initHandler(eventBase, queue_->eventfd_);
677   } else {
678     initHandler(eventBase, queue_->pipeFds_[0]);
679   }
680 }
681
682 template<typename MessageT>
683 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
684   if (queue_ == nullptr) {
685     assert(!isHandlerRegistered());
686     return;
687   }
688
689   {
690     folly::MSLGuard g(queue_->spinlock_);
691     queue_->numConsumers_--;
692     setActive(false);
693   }
694
695   assert(isHandlerRegistered());
696   unregisterHandler();
697   detachEventBase();
698   queue_ = nullptr;
699 }
700
701 } // folly