Move thrift/lib/cpp/async to folly.
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #pragma once
20
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 #include "folly/io/async/EventBase.h"
25 #include "folly/io/async/EventFDWrapper.h"
26 #include "folly/io/async/EventHandler.h"
27 #include "folly/io/async/Request.h"
28 #include "folly/Likely.h"
29 #include "folly/SmallLocks.h"
30
31 #include <glog/logging.h>
32 #include <deque>
33
34 namespace folly {
35
36 /**
37  * A producer-consumer queue for passing messages between EventBase threads.
38  *
39  * Messages can be added to the queue from any thread.  Multiple consumers may
40  * listen to the queue from multiple EventBase threads.
41  *
42  * A NotificationQueue may not be destroyed while there are still consumers
43  * registered to receive events from the queue.  It is the user's
44  * responsibility to ensure that all consumers are unregistered before the
45  * queue is destroyed.
46  *
47  * MessageT should be MoveConstructible (i.e., must support either a move
48  * constructor or a copy constructor, or both).  Ideally it's move constructor
49  * (or copy constructor if no move constructor is provided) should never throw
50  * exceptions.  If the constructor may throw, the consumers could end up
51  * spinning trying to move a message off the queue and failing, and then
52  * retrying.
53  */
54 template<typename MessageT>
55 class NotificationQueue {
56  public:
57   /**
58    * A callback interface for consuming messages from the queue as they arrive.
59    */
60   class Consumer : private EventHandler {
61    public:
62     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
63
64     Consumer()
65       : queue_(nullptr),
66         destroyedFlagPtr_(nullptr),
67         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
68
69     virtual ~Consumer();
70
71     /**
72      * messageAvailable() will be invoked whenever a new
73      * message is available from the pipe.
74      */
75     virtual void messageAvailable(MessageT&& message) = 0;
76
77     /**
78      * Begin consuming messages from the specified queue.
79      *
80      * messageAvailable() will be called whenever a message is available.  This
81      * consumer will continue to consume messages until stopConsuming() is
82      * called.
83      *
84      * A Consumer may only consume messages from a single NotificationQueue at
85      * a time.  startConsuming() should not be called if this consumer is
86      * already consuming.
87      */
88     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
89       init(eventBase, queue);
90       registerHandler(READ | PERSIST);
91     }
92
93     /**
94      * Same as above but registers this event handler as internal so that it
95      * doesn't count towards the pending reader count for the IOLoop.
96      */
97     void startConsumingInternal(
98         EventBase* eventBase, NotificationQueue* queue) {
99       init(eventBase, queue);
100       registerInternalHandler(READ | PERSIST);
101     }
102
103     /**
104      * Stop consuming messages.
105      *
106      * startConsuming() may be called again to resume consumption of messages
107      * at a later point in time.
108      */
109     void stopConsuming();
110
111     /**
112      * Get the NotificationQueue that this consumer is currently consuming
113      * messages from.  Returns nullptr if the consumer is not currently
114      * consuming events from any queue.
115      */
116     NotificationQueue* getCurrentQueue() const {
117       return queue_;
118     }
119
120     /**
121      * Set a limit on how many messages this consumer will read each iteration
122      * around the event loop.
123      *
124      * This helps rate-limit how much work the Consumer will do each event loop
125      * iteration, to prevent it from starving other event handlers.
126      *
127      * A limit of 0 means no limit will be enforced.  If unset, the limit
128      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
129      */
130     void setMaxReadAtOnce(uint32_t maxAtOnce) {
131       maxReadAtOnce_ = maxAtOnce;
132     }
133     uint32_t getMaxReadAtOnce() const {
134       return maxReadAtOnce_;
135     }
136
137     EventBase* getEventBase() {
138       return base_;
139     }
140
141     virtual void handlerReady(uint16_t events) noexcept;
142
143    private:
144     void init(EventBase* eventBase, NotificationQueue* queue);
145
146     NotificationQueue* queue_;
147     bool* destroyedFlagPtr_;
148     uint32_t maxReadAtOnce_;
149     EventBase* base_;
150   };
151
152   enum class FdType {
153     EVENTFD,
154     PIPE
155   };
156
157   /**
158    * Create a new NotificationQueue.
159    *
160    * If the maxSize parameter is specified, this sets the maximum queue size
161    * that will be enforced by tryPutMessage().  (This size is advisory, and may
162    * be exceeded if producers explicitly use putMessage() instead of
163    * tryPutMessage().)
164    *
165    * The fdType parameter determines the type of file descriptor used
166    * internally to signal message availability.  The default (eventfd) is
167    * preferable for performance and because it won't fail when the queue gets
168    * too long.  It is not available on on older and non-linux kernels, however.
169    * In this case the code will fall back to using a pipe, the parameter is
170    * mostly for testing purposes.
171    */
172   explicit NotificationQueue(uint32_t maxSize = 0,
173                               FdType fdType = FdType::EVENTFD)
174     : eventfd_(-1),
175       pipeFds_{-1, -1},
176       advisoryMaxQueueSize_(maxSize),
177       pid_(getpid()),
178       queue_() {
179
180     spinlock_.init();
181
182     RequestContext::getStaticContext();
183
184     if (fdType == FdType::EVENTFD) {
185       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
186       if (eventfd_ == -1) {
187         if (errno == ENOSYS || errno == EINVAL) {
188           // eventfd not availalble
189           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
190                      << errno << ", falling back to pipe mode (is your kernel "
191                      << "> 2.6.30?)";
192           fdType = FdType::PIPE;
193         } else {
194           // some other error
195           folly::throwSystemError("Failed to create eventfd for "
196                                   "NotificationQueue", errno);
197         }
198       }
199     }
200     if (fdType == FdType::PIPE) {
201       if (pipe(pipeFds_)) {
202         folly::throwSystemError("Failed to create pipe for NotificationQueue",
203                                 errno);
204       }
205       try {
206         // put both ends of the pipe into non-blocking mode
207         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
208           folly::throwSystemError("failed to put NotificationQueue pipe read "
209                                   "endpoint into non-blocking mode", errno);
210         }
211         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
212           folly::throwSystemError("failed to put NotificationQueue pipe write "
213                                   "endpoint into non-blocking mode", errno);
214         }
215       } catch (...) {
216         ::close(pipeFds_[0]);
217         ::close(pipeFds_[1]);
218         throw;
219       }
220     }
221   }
222
223   ~NotificationQueue() {
224     if (eventfd_ >= 0) {
225       ::close(eventfd_);
226       eventfd_ = -1;
227     }
228     if (pipeFds_[0] >= 0) {
229       ::close(pipeFds_[0]);
230       pipeFds_[0] = -1;
231     }
232     if (pipeFds_[1] >= 0) {
233       ::close(pipeFds_[1]);
234       pipeFds_[1] = -1;
235     }
236   }
237
238   /**
239    * Set the advisory maximum queue size.
240    *
241    * This maximum queue size affects calls to tryPutMessage().  Message
242    * producers can still use the putMessage() call to unconditionally put a
243    * message on the queue, ignoring the configured maximum queue size.  This
244    * can cause the queue size to exceed the configured maximum.
245    */
246   void setMaxQueueSize(uint32_t max) {
247     advisoryMaxQueueSize_ = max;
248   }
249
250   /**
251    * Attempt to put a message on the queue if the queue is not already full.
252    *
253    * If the queue is full, a std::overflow_error will be thrown.  The
254    * setMaxQueueSize() function controls the maximum queue size.
255    *
256    * This method may contend briefly on a spinlock if many threads are
257    * concurrently accessing the queue, but for all intents and purposes it will
258    * immediately place the message on the queue and return.
259    *
260    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
261    * may throw any other exception thrown by the MessageT move/copy
262    * constructor.
263    */
264   void tryPutMessage(MessageT&& message) {
265     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
266   }
267   void tryPutMessage(const MessageT& message) {
268     putMessageImpl(message, advisoryMaxQueueSize_);
269   }
270
271   /**
272    * No-throw versions of the above.  Instead returns true on success, false on
273    * failure.
274    *
275    * Only std::overflow_error is prevented from being thrown (since this is the
276    * common exception case), user code must still catch std::bad_alloc errors.
277    */
278   bool tryPutMessageNoThrow(MessageT&& message) {
279     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
280   }
281   bool tryPutMessageNoThrow(const MessageT& message) {
282     return putMessageImpl(message, advisoryMaxQueueSize_, false);
283   }
284
285   /**
286    * Unconditionally put a message on the queue.
287    *
288    * This method is like tryPutMessage(), but ignores the maximum queue size
289    * and always puts the message on the queue, even if the maximum queue size
290    * would be exceeded.
291    *
292    * putMessage() may throw std::bad_alloc if memory allocation fails, and may
293    * throw any other exception thrown by the MessageT move/copy constructor.
294    */
295   void putMessage(MessageT&& message) {
296     putMessageImpl(std::move(message), 0);
297   }
298   void putMessage(const MessageT& message) {
299     putMessageImpl(message, 0);
300   }
301
302   /**
303    * Put several messages on the queue.
304    */
305   template<typename InputIteratorT>
306   void putMessages(InputIteratorT first, InputIteratorT last) {
307     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
308       IterCategory;
309     putMessagesImpl(first, last, IterCategory());
310   }
311
312   /**
313    * Try to immediately pull a message off of the queue, without blocking.
314    *
315    * If a message is immediately available, the result parameter will be
316    * updated to contain the message contents and true will be returned.
317    *
318    * If no message is available, false will be returned and result will be left
319    * unmodified.
320    */
321   bool tryConsume(MessageT& result) {
322     checkPid();
323     if (!tryConsumeEvent()) {
324       return false;
325     }
326
327     try {
328
329       folly::MSLGuard g(spinlock_);
330
331       // This shouldn't happen normally.  See the comments in
332       // Consumer::handlerReady() for more details.
333       if (UNLIKELY(queue_.empty())) {
334         LOG(ERROR) << "found empty queue after signalled event";
335         return false;
336       }
337
338       auto data = std::move(queue_.front());
339       result = data.first;
340       RequestContext::setContext(data.second);
341
342       queue_.pop_front();
343     } catch (...) {
344       // Handle an exception if the assignment operator happens to throw.
345       // We consumed an event but weren't able to pop the message off the
346       // queue.  Signal the event again since the message is still in the
347       // queue.
348       signalEvent(1);
349       throw;
350     }
351
352     return true;
353   }
354
355   int size() {
356     folly::MSLGuard g(spinlock_);
357     return queue_.size();
358   }
359
360   /**
361    * Check that the NotificationQueue is being used from the correct process.
362    *
363    * If you create a NotificationQueue in one process, then fork, and try to
364    * send messages to the queue from the child process, you're going to have a
365    * bad time.  Unfortunately users have (accidentally) run into this.
366    *
367    * Because we use an eventfd/pipe, the child process can actually signal the
368    * parent process that an event is ready.  However, it can't put anything on
369    * the parent's queue, so the parent wakes up and finds an empty queue.  This
370    * check ensures that we catch the problem in the misbehaving child process
371    * code, and crash before signalling the parent process.
372    */
373   void checkPid() const {
374     CHECK_EQ(pid_, getpid());
375   }
376
377  private:
378   // Forbidden copy constructor and assignment operator
379   NotificationQueue(NotificationQueue const &) = delete;
380   NotificationQueue& operator=(NotificationQueue const &) = delete;
381
382   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
383     DCHECK(0 == spinlock_.try_lock());
384     if (maxSize > 0 && queue_.size() >= maxSize) {
385       if (throws) {
386         throw std::overflow_error("unable to add message to NotificationQueue: "
387                                   "queue is full");
388       }
389       return false;
390     }
391     return true;
392   }
393
394   inline void signalEvent(size_t numAdded = 1) const {
395     static const uint8_t kPipeMessage[] = {
396       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
397     };
398
399     ssize_t bytes_written = 0;
400     ssize_t bytes_expected = 0;
401     if (eventfd_ >= 0) {
402       // eventfd(2) dictates that we must write a 64-bit integer
403       uint64_t numAdded64(numAdded);
404       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
405       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
406     } else {
407       // pipe semantics, add one message for each numAdded
408       bytes_expected = numAdded;
409       do {
410         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
411         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
412         if (rc < 0) {
413           // TODO: if the pipe is full, write will fail with EAGAIN.
414           // See task #1044651 for how this could be handled
415           break;
416         }
417         numAdded -= rc;
418         bytes_written += rc;
419       } while (numAdded > 0);
420     }
421     if (bytes_written != bytes_expected) {
422       folly::throwSystemError("failed to signal NotificationQueue after "
423                               "write", errno);
424     }
425   }
426
427   bool tryConsumeEvent() {
428     uint64_t value = 0;
429     ssize_t rc = -1;
430     if (eventfd_ >= 0) {
431       rc = ::read(eventfd_, &value, sizeof(value));
432     } else {
433       uint8_t value8;
434       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
435       value = value8;
436     }
437     if (rc < 0) {
438       // EAGAIN should pretty much be the only error we can ever get.
439       // This means someone else already processed the only available message.
440       assert(errno == EAGAIN);
441       return false;
442     }
443     assert(value == 1);
444     return true;
445   }
446
447   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
448     checkPid();
449     {
450       folly::MSLGuard g(spinlock_);
451       if (!checkQueueSize(maxSize, throws)) {
452         return false;
453       }
454       queue_.push_back(
455         std::make_pair(std::move(message),
456                        RequestContext::saveContext()));
457     }
458     signalEvent();
459     return true;
460   }
461
462   bool putMessageImpl(
463     const MessageT& message, size_t maxSize, bool throws=true) {
464     checkPid();
465     {
466       folly::MSLGuard g(spinlock_);
467       if (!checkQueueSize(maxSize, throws)) {
468         return false;
469       }
470       queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
471     }
472     signalEvent();
473     return true;
474   }
475
476   template<typename InputIteratorT>
477   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
478                        std::input_iterator_tag) {
479     checkPid();
480     size_t numAdded = 0;
481     {
482       folly::MSLGuard g(spinlock_);
483       while (first != last) {
484         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
485         ++first;
486         ++numAdded;
487       }
488     }
489     signalEvent(numAdded);
490   }
491
492   mutable folly::MicroSpinLock spinlock_;
493   int eventfd_;
494   int pipeFds_[2]; // to fallback to on older/non-linux systems
495   uint32_t advisoryMaxQueueSize_;
496   pid_t pid_;
497   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
498 };
499
500 template<typename MessageT>
501 NotificationQueue<MessageT>::Consumer::~Consumer() {
502   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
503   // will be non-nullptr.  Mark the value that it points to, so that
504   // handlerReady() will know the callback is destroyed, and that it cannot
505   // access any member variables anymore.
506   if (destroyedFlagPtr_) {
507     *destroyedFlagPtr_ = true;
508   }
509 }
510
511 template<typename MessageT>
512 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
513     noexcept {
514   uint32_t numProcessed = 0;
515   while (true) {
516     // Try to decrement the eventfd.
517     //
518     // We decrement the eventfd before checking the queue, and only pop a
519     // message off the queue if we read from the eventfd.
520     //
521     // Reading the eventfd first allows us to not have to hold the spinlock
522     // while accessing the eventfd.  If we popped from the queue first, we
523     // would have to hold the lock while reading from or writing to the
524     // eventfd.  (Multiple consumers may be woken up from a single eventfd
525     // notification.  If we popped from the queue first, we could end up
526     // popping a message from the queue before the eventfd has been notified by
527     // the producer, unless the consumer and producer both held the spinlock
528     // around the entire operation.)
529     if (!queue_->tryConsumeEvent()) {
530       // no message available right now
531       return;
532     }
533
534     // Now pop the message off of the queue.
535     // We successfully consumed the eventfd notification.
536     // There should be a message available for us to consume.
537     //
538     // We have to manually acquire and release the spinlock here, rather than
539     // using SpinLockHolder since the MessageT has to be constructed while
540     // holding the spinlock and available after we release it.  SpinLockHolder
541     // unfortunately doesn't provide a release() method.  (We can't construct
542     // MessageT first since we have no guarantee that MessageT has a default
543     // constructor.
544     queue_->spinlock_.lock();
545     bool locked = true;
546
547     try {
548       // The eventfd is incremented once for every message, and only
549       // decremented when a message is popped off.  There should always be a
550       // message here to read.
551       if (UNLIKELY(queue_->queue_.empty())) {
552         // Unfortunately we have seen this happen in practice if a user forks
553         // the process, and then the child tries to send a message to a
554         // NotificationQueue being monitored by a thread in the parent.
555         // The child can signal the parent via the eventfd, but won't have been
556         // able to put anything on the parent's queue since it has a separate
557         // address space.
558         //
559         // This is a bug in the sender's code.  putMessagesImpl() should cause
560         // the sender to crash now before trying to send a message from the
561         // wrong process.  However, just in case let's handle this case in the
562         // consumer without crashing.
563         LOG(ERROR) << "found empty queue after signalled event";
564         queue_->spinlock_.unlock();
565         return;
566       }
567
568       // Pull a message off the queue.
569       auto& data = queue_->queue_.front();
570
571       MessageT msg(std::move(data.first));
572       auto old_ctx =
573         RequestContext::setContext(data.second);
574       queue_->queue_.pop_front();
575
576       // Check to see if the queue is empty now.
577       // We use this as an optimization to see if we should bother trying to
578       // loop again and read another message after invoking this callback.
579       bool wasEmpty = queue_->queue_.empty();
580
581       // Now unlock the spinlock before we invoke the callback.
582       queue_->spinlock_.unlock();
583       locked = false;
584
585       // Call the callback
586       bool callbackDestroyed = false;
587       CHECK(destroyedFlagPtr_ == nullptr);
588       destroyedFlagPtr_ = &callbackDestroyed;
589       messageAvailable(std::move(msg));
590
591       RequestContext::setContext(old_ctx);
592
593       // If the callback was destroyed before it returned, we are done
594       if (callbackDestroyed) {
595         return;
596       }
597       destroyedFlagPtr_ = nullptr;
598
599       // If the callback is no longer installed, we are done.
600       if (queue_ == nullptr) {
601         return;
602       }
603
604       // If we have hit maxReadAtOnce_, we are done.
605       ++numProcessed;
606       if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
607         return;
608       }
609
610       // If the queue was empty before we invoked the callback, it's probable
611       // that it is still empty now.  Just go ahead and return, rather than
612       // looping again and trying to re-read from the eventfd.  (If a new
613       // message had in fact arrived while we were invoking the callback, we
614       // will simply be woken up the next time around the event loop and will
615       // process the message then.)
616       if (wasEmpty) {
617         return;
618       }
619     } catch (const std::exception& ex) {
620       // This catch block is really just to handle the case where the MessageT
621       // constructor throws.  The messageAvailable() callback itself is
622       // declared as noexcept and should never throw.
623       //
624       // If the MessageT constructor does throw we try to handle it as best as
625       // we can, but we can't work miracles.  We will just ignore the error for
626       // now and return.  The next time around the event loop we will end up
627       // trying to read the message again.  If MessageT continues to throw we
628       // will never make forward progress and will keep trying each time around
629       // the event loop.
630       if (locked) {
631         // Unlock the spinlock.
632         queue_->spinlock_.unlock();
633
634         // Push a notification back on the eventfd since we didn't actually
635         // read the message off of the queue.
636         queue_->signalEvent(1);
637       }
638
639       return;
640     }
641   }
642 }
643
644 template<typename MessageT>
645 void NotificationQueue<MessageT>::Consumer::init(
646     EventBase* eventBase,
647     NotificationQueue* queue) {
648   assert(eventBase->isInEventBaseThread());
649   assert(queue_ == nullptr);
650   assert(!isHandlerRegistered());
651   queue->checkPid();
652
653   base_ = eventBase;
654
655   queue_ = queue;
656   if (queue_->eventfd_ >= 0) {
657     initHandler(eventBase, queue_->eventfd_);
658   } else {
659     initHandler(eventBase, queue_->pipeFds_[0]);
660   }
661 }
662
663 template<typename MessageT>
664 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
665   if (queue_ == nullptr) {
666     assert(!isHandlerRegistered());
667     return;
668   }
669
670   assert(isHandlerRegistered());
671   unregisterHandler();
672   detachEventBase();
673   queue_ = nullptr;
674 }
675
676 } // folly