Fix race in Notification Queue
[folly.git] / folly / io / async / NotificationQueue.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventFDWrapper.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/io/async/Request.h>
26 #include <folly/Likely.h>
27 #include <folly/SmallLocks.h>
28 #include <folly/ScopeGuard.h>
29
30 #include <glog/logging.h>
31 #include <deque>
32
33 namespace folly {
34
35 /**
36  * A producer-consumer queue for passing messages between EventBase threads.
37  *
38  * Messages can be added to the queue from any thread.  Multiple consumers may
39  * listen to the queue from multiple EventBase threads.
40  *
41  * A NotificationQueue may not be destroyed while there are still consumers
42  * registered to receive events from the queue.  It is the user's
43  * responsibility to ensure that all consumers are unregistered before the
44  * queue is destroyed.
45  *
46  * MessageT should be MoveConstructible (i.e., must support either a move
47  * constructor or a copy constructor, or both).  Ideally it's move constructor
48  * (or copy constructor if no move constructor is provided) should never throw
49  * exceptions.  If the constructor may throw, the consumers could end up
50  * spinning trying to move a message off the queue and failing, and then
51  * retrying.
52  */
53 template<typename MessageT>
54 class NotificationQueue {
55  public:
56   /**
57    * A callback interface for consuming messages from the queue as they arrive.
58    */
59   class Consumer : private EventHandler {
60    public:
61     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
62
63     Consumer()
64       : queue_(nullptr),
65         destroyedFlagPtr_(nullptr),
66         maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
67
68     virtual ~Consumer();
69
70     /**
71      * messageAvailable() will be invoked whenever a new
72      * message is available from the pipe.
73      */
74     virtual void messageAvailable(MessageT&& message) = 0;
75
76     /**
77      * Begin consuming messages from the specified queue.
78      *
79      * messageAvailable() will be called whenever a message is available.  This
80      * consumer will continue to consume messages until stopConsuming() is
81      * called.
82      *
83      * A Consumer may only consume messages from a single NotificationQueue at
84      * a time.  startConsuming() should not be called if this consumer is
85      * already consuming.
86      */
87     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
88       init(eventBase, queue);
89       registerHandler(READ | PERSIST);
90     }
91
92     /**
93      * Same as above but registers this event handler as internal so that it
94      * doesn't count towards the pending reader count for the IOLoop.
95      */
96     void startConsumingInternal(
97         EventBase* eventBase, NotificationQueue* queue) {
98       init(eventBase, queue);
99       registerInternalHandler(READ | PERSIST);
100     }
101
102     /**
103      * Stop consuming messages.
104      *
105      * startConsuming() may be called again to resume consumption of messages
106      * at a later point in time.
107      */
108     void stopConsuming();
109
110     /**
111      * Get the NotificationQueue that this consumer is currently consuming
112      * messages from.  Returns nullptr if the consumer is not currently
113      * consuming events from any queue.
114      */
115     NotificationQueue* getCurrentQueue() const {
116       return queue_;
117     }
118
119     /**
120      * Set a limit on how many messages this consumer will read each iteration
121      * around the event loop.
122      *
123      * This helps rate-limit how much work the Consumer will do each event loop
124      * iteration, to prevent it from starving other event handlers.
125      *
126      * A limit of 0 means no limit will be enforced.  If unset, the limit
127      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
128      */
129     void setMaxReadAtOnce(uint32_t maxAtOnce) {
130       maxReadAtOnce_ = maxAtOnce;
131     }
132     uint32_t getMaxReadAtOnce() const {
133       return maxReadAtOnce_;
134     }
135
136     EventBase* getEventBase() {
137       return base_;
138     }
139
140     virtual void handlerReady(uint16_t events) noexcept;
141
142    private:
143
144     void setActive(bool active, bool shouldLock = false) {
145       if (!queue_) {
146         active_ = active;
147         return;
148       }
149       if (shouldLock) {
150         queue_->spinlock_.lock();
151       }
152       if (!active_ && active) {
153         ++queue_->numActiveConsumers_;
154       } else if (active_ && !active) {
155         --queue_->numActiveConsumers_;
156       }
157       active_ = active;
158       if (shouldLock) {
159         queue_->spinlock_.unlock();
160       }
161     }
162     void init(EventBase* eventBase, NotificationQueue* queue);
163
164     NotificationQueue* queue_;
165     bool* destroyedFlagPtr_;
166     uint32_t maxReadAtOnce_;
167     EventBase* base_;
168     bool active_{false};
169   };
170
171   enum class FdType {
172     EVENTFD,
173     PIPE
174   };
175
176   /**
177    * Create a new NotificationQueue.
178    *
179    * If the maxSize parameter is specified, this sets the maximum queue size
180    * that will be enforced by tryPutMessage().  (This size is advisory, and may
181    * be exceeded if producers explicitly use putMessage() instead of
182    * tryPutMessage().)
183    *
184    * The fdType parameter determines the type of file descriptor used
185    * internally to signal message availability.  The default (eventfd) is
186    * preferable for performance and because it won't fail when the queue gets
187    * too long.  It is not available on on older and non-linux kernels, however.
188    * In this case the code will fall back to using a pipe, the parameter is
189    * mostly for testing purposes.
190    */
191   explicit NotificationQueue(uint32_t maxSize = 0,
192                               FdType fdType = FdType::EVENTFD)
193     : eventfd_(-1),
194       pipeFds_{-1, -1},
195       advisoryMaxQueueSize_(maxSize),
196       pid_(getpid()),
197       queue_() {
198
199     spinlock_.init();
200
201     RequestContext::getStaticContext();
202
203     if (fdType == FdType::EVENTFD) {
204       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
205       if (eventfd_ == -1) {
206         if (errno == ENOSYS || errno == EINVAL) {
207           // eventfd not availalble
208           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
209                      << errno << ", falling back to pipe mode (is your kernel "
210                      << "> 2.6.30?)";
211           fdType = FdType::PIPE;
212         } else {
213           // some other error
214           folly::throwSystemError("Failed to create eventfd for "
215                                   "NotificationQueue", errno);
216         }
217       }
218     }
219     if (fdType == FdType::PIPE) {
220       if (pipe(pipeFds_)) {
221         folly::throwSystemError("Failed to create pipe for NotificationQueue",
222                                 errno);
223       }
224       try {
225         // put both ends of the pipe into non-blocking mode
226         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
227           folly::throwSystemError("failed to put NotificationQueue pipe read "
228                                   "endpoint into non-blocking mode", errno);
229         }
230         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
231           folly::throwSystemError("failed to put NotificationQueue pipe write "
232                                   "endpoint into non-blocking mode", errno);
233         }
234       } catch (...) {
235         ::close(pipeFds_[0]);
236         ::close(pipeFds_[1]);
237         throw;
238       }
239     }
240   }
241
242   ~NotificationQueue() {
243     if (eventfd_ >= 0) {
244       ::close(eventfd_);
245       eventfd_ = -1;
246     }
247     if (pipeFds_[0] >= 0) {
248       ::close(pipeFds_[0]);
249       pipeFds_[0] = -1;
250     }
251     if (pipeFds_[1] >= 0) {
252       ::close(pipeFds_[1]);
253       pipeFds_[1] = -1;
254     }
255   }
256
257   /**
258    * Set the advisory maximum queue size.
259    *
260    * This maximum queue size affects calls to tryPutMessage().  Message
261    * producers can still use the putMessage() call to unconditionally put a
262    * message on the queue, ignoring the configured maximum queue size.  This
263    * can cause the queue size to exceed the configured maximum.
264    */
265   void setMaxQueueSize(uint32_t max) {
266     advisoryMaxQueueSize_ = max;
267   }
268
269   /**
270    * Attempt to put a message on the queue if the queue is not already full.
271    *
272    * If the queue is full, a std::overflow_error will be thrown.  The
273    * setMaxQueueSize() function controls the maximum queue size.
274    *
275    * This method may contend briefly on a spinlock if many threads are
276    * concurrently accessing the queue, but for all intents and purposes it will
277    * immediately place the message on the queue and return.
278    *
279    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
280    * may throw any other exception thrown by the MessageT move/copy
281    * constructor.
282    */
283   void tryPutMessage(MessageT&& message) {
284     putMessageImpl(std::move(message), advisoryMaxQueueSize_);
285   }
286   void tryPutMessage(const MessageT& message) {
287     putMessageImpl(message, advisoryMaxQueueSize_);
288   }
289
290   /**
291    * No-throw versions of the above.  Instead returns true on success, false on
292    * failure.
293    *
294    * Only std::overflow_error is prevented from being thrown (since this is the
295    * common exception case), user code must still catch std::bad_alloc errors.
296    */
297   bool tryPutMessageNoThrow(MessageT&& message) {
298     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
299   }
300   bool tryPutMessageNoThrow(const MessageT& message) {
301     return putMessageImpl(message, advisoryMaxQueueSize_, false);
302   }
303
304   /**
305    * Unconditionally put a message on the queue.
306    *
307    * This method is like tryPutMessage(), but ignores the maximum queue size
308    * and always puts the message on the queue, even if the maximum queue size
309    * would be exceeded.
310    *
311    * putMessage() may throw std::bad_alloc if memory allocation fails, and may
312    * throw any other exception thrown by the MessageT move/copy constructor.
313    */
314   void putMessage(MessageT&& message) {
315     putMessageImpl(std::move(message), 0);
316   }
317   void putMessage(const MessageT& message) {
318     putMessageImpl(message, 0);
319   }
320
321   /**
322    * Put several messages on the queue.
323    */
324   template<typename InputIteratorT>
325   void putMessages(InputIteratorT first, InputIteratorT last) {
326     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
327       IterCategory;
328     putMessagesImpl(first, last, IterCategory());
329   }
330
331   /**
332    * Try to immediately pull a message off of the queue, without blocking.
333    *
334    * If a message is immediately available, the result parameter will be
335    * updated to contain the message contents and true will be returned.
336    *
337    * If no message is available, false will be returned and result will be left
338    * unmodified.
339    */
340   bool tryConsume(MessageT& result) {
341     checkPid();
342
343     try {
344
345       folly::MSLGuard g(spinlock_);
346
347       if (UNLIKELY(queue_.empty())) {
348         return false;
349       }
350
351       auto data = std::move(queue_.front());
352       result = data.first;
353       RequestContext::setContext(data.second);
354
355       queue_.pop_front();
356     } catch (...) {
357       // Handle an exception if the assignment operator happens to throw.
358       // We consumed an event but weren't able to pop the message off the
359       // queue.  Signal the event again since the message is still in the
360       // queue.
361       signalEvent(1);
362       throw;
363     }
364
365     return true;
366   }
367
368   int size() {
369     folly::MSLGuard g(spinlock_);
370     return queue_.size();
371   }
372
373   /**
374    * Check that the NotificationQueue is being used from the correct process.
375    *
376    * If you create a NotificationQueue in one process, then fork, and try to
377    * send messages to the queue from the child process, you're going to have a
378    * bad time.  Unfortunately users have (accidentally) run into this.
379    *
380    * Because we use an eventfd/pipe, the child process can actually signal the
381    * parent process that an event is ready.  However, it can't put anything on
382    * the parent's queue, so the parent wakes up and finds an empty queue.  This
383    * check ensures that we catch the problem in the misbehaving child process
384    * code, and crash before signalling the parent process.
385    */
386   void checkPid() const {
387     CHECK_EQ(pid_, getpid());
388   }
389
390  private:
391   // Forbidden copy constructor and assignment operator
392   NotificationQueue(NotificationQueue const &) = delete;
393   NotificationQueue& operator=(NotificationQueue const &) = delete;
394
395   inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
396     DCHECK(0 == spinlock_.try_lock());
397     if (maxSize > 0 && queue_.size() >= maxSize) {
398       if (throws) {
399         throw std::overflow_error("unable to add message to NotificationQueue: "
400                                   "queue is full");
401       }
402       return false;
403     }
404     return true;
405   }
406
407   inline void signalEvent(size_t numAdded = 1) const {
408     static const uint8_t kPipeMessage[] = {
409       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
410     };
411
412     ssize_t bytes_written = 0;
413     ssize_t bytes_expected = 0;
414     if (eventfd_ >= 0) {
415       // eventfd(2) dictates that we must write a 64-bit integer
416       uint64_t numAdded64(numAdded);
417       bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
418       bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
419     } else {
420       // pipe semantics, add one message for each numAdded
421       bytes_expected = numAdded;
422       do {
423         size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
424         ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
425         if (rc < 0) {
426           // TODO: if the pipe is full, write will fail with EAGAIN.
427           // See task #1044651 for how this could be handled
428           break;
429         }
430         numAdded -= rc;
431         bytes_written += rc;
432       } while (numAdded > 0);
433     }
434     if (bytes_written != bytes_expected) {
435       folly::throwSystemError("failed to signal NotificationQueue after "
436                               "write", errno);
437     }
438   }
439
440   bool tryConsumeEvent() {
441     uint64_t value = 0;
442     ssize_t rc = -1;
443     if (eventfd_ >= 0) {
444       rc = ::read(eventfd_, &value, sizeof(value));
445     } else {
446       uint8_t value8;
447       rc = ::read(pipeFds_[0], &value8, sizeof(value8));
448       value = value8;
449     }
450     if (rc < 0) {
451       // EAGAIN should pretty much be the only error we can ever get.
452       // This means someone else already processed the only available message.
453       assert(errno == EAGAIN);
454       return false;
455     }
456     assert(value == 1);
457     return true;
458   }
459
460   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
461     checkPid();
462     bool signal = false;
463     {
464       folly::MSLGuard g(spinlock_);
465       if (!checkQueueSize(maxSize, throws)) {
466         return false;
467       }
468       // We only need to signal an event if not all consumers are
469       // awake.
470       if (numActiveConsumers_ < numConsumers_) {
471         signal = true;
472       }
473       queue_.push_back(
474         std::make_pair(std::move(message),
475                        RequestContext::saveContext()));
476     }
477     if (signal) {
478       signalEvent();
479     }
480     return true;
481   }
482
483   bool putMessageImpl(
484     const MessageT& message, size_t maxSize, bool throws=true) {
485     checkPid();
486     bool signal = false;
487     {
488       folly::MSLGuard g(spinlock_);
489       if (!checkQueueSize(maxSize, throws)) {
490         return false;
491       }
492       if (numActiveConsumers_ < numConsumers_) {
493         signal = true;
494       }
495       queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
496     }
497     if (signal) {
498       signalEvent();
499     }
500     return true;
501   }
502
503   template<typename InputIteratorT>
504   void putMessagesImpl(InputIteratorT first, InputIteratorT last,
505                        std::input_iterator_tag) {
506     checkPid();
507     bool signal = false;
508     size_t numAdded = 0;
509     {
510       folly::MSLGuard g(spinlock_);
511       while (first != last) {
512         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
513         ++first;
514         ++numAdded;
515       }
516       if (numActiveConsumers_ < numConsumers_) {
517         signal = true;
518       }
519     }
520     if (signal) {
521       signalEvent();
522     }
523   }
524
525   mutable folly::MicroSpinLock spinlock_;
526   int eventfd_;
527   int pipeFds_[2]; // to fallback to on older/non-linux systems
528   uint32_t advisoryMaxQueueSize_;
529   pid_t pid_;
530   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
531   int numConsumers_{0};
532   std::atomic<int> numActiveConsumers_{0};
533 };
534
535 template<typename MessageT>
536 NotificationQueue<MessageT>::Consumer::~Consumer() {
537   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
538   // will be non-nullptr.  Mark the value that it points to, so that
539   // handlerReady() will know the callback is destroyed, and that it cannot
540   // access any member variables anymore.
541   if (destroyedFlagPtr_) {
542     *destroyedFlagPtr_ = true;
543   }
544 }
545
546 template<typename MessageT>
547 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
548     noexcept {
549   uint32_t numProcessed = 0;
550   bool firstRun = true;
551   setActive(true);
552   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
553   while (true) {
554     // Try to decrement the eventfd.
555     //
556     // The eventfd is only used to wake up the consumer - there may or
557     // may not actually be an event available (another consumer may
558     // have read it).  We don't really care, we only care about
559     // emptying the queue.
560     if (firstRun) {
561       queue_->tryConsumeEvent();
562       firstRun = false;
563     }
564
565     // Now pop the message off of the queue.
566     //
567     // We have to manually acquire and release the spinlock here, rather than
568     // using SpinLockHolder since the MessageT has to be constructed while
569     // holding the spinlock and available after we release it.  SpinLockHolder
570     // unfortunately doesn't provide a release() method.  (We can't construct
571     // MessageT first since we have no guarantee that MessageT has a default
572     // constructor.
573     queue_->spinlock_.lock();
574     bool locked = true;
575
576     try {
577       if (UNLIKELY(queue_->queue_.empty())) {
578         // If there is no message, we've reached the end of the queue, return.
579         queue_->spinlock_.unlock();
580         return;
581       }
582
583       // Pull a message off the queue.
584       auto& data = queue_->queue_.front();
585
586       MessageT msg(std::move(data.first));
587       auto old_ctx =
588         RequestContext::setContext(data.second);
589       queue_->queue_.pop_front();
590
591       // Check to see if the queue is empty now.
592       // We use this as an optimization to see if we should bother trying to
593       // loop again and read another message after invoking this callback.
594       bool wasEmpty = queue_->queue_.empty();
595       if (wasEmpty) {
596         setActive(false);
597       }
598
599       // Now unlock the spinlock before we invoke the callback.
600       queue_->spinlock_.unlock();
601       locked = false;
602
603       // Call the callback
604       bool callbackDestroyed = false;
605       CHECK(destroyedFlagPtr_ == nullptr);
606       destroyedFlagPtr_ = &callbackDestroyed;
607       messageAvailable(std::move(msg));
608
609       RequestContext::setContext(old_ctx);
610
611       // If the callback was destroyed before it returned, we are done
612       if (callbackDestroyed) {
613         return;
614       }
615       destroyedFlagPtr_ = nullptr;
616
617       // If the callback is no longer installed, we are done.
618       if (queue_ == nullptr) {
619         return;
620       }
621
622       // If we have hit maxReadAtOnce_, we are done.
623       ++numProcessed;
624       if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
625         queue_->signalEvent(1);
626         return;
627       }
628
629       // If the queue was empty before we invoked the callback, it's probable
630       // that it is still empty now.  Just go ahead and return, rather than
631       // looping again and trying to re-read from the eventfd.  (If a new
632       // message had in fact arrived while we were invoking the callback, we
633       // will simply be woken up the next time around the event loop and will
634       // process the message then.)
635       if (wasEmpty) {
636         return;
637       }
638     } catch (const std::exception& ex) {
639       // This catch block is really just to handle the case where the MessageT
640       // constructor throws.  The messageAvailable() callback itself is
641       // declared as noexcept and should never throw.
642       //
643       // If the MessageT constructor does throw we try to handle it as best as
644       // we can, but we can't work miracles.  We will just ignore the error for
645       // now and return.  The next time around the event loop we will end up
646       // trying to read the message again.  If MessageT continues to throw we
647       // will never make forward progress and will keep trying each time around
648       // the event loop.
649       if (locked) {
650         // Unlock the spinlock.
651         queue_->spinlock_.unlock();
652
653         // Push a notification back on the eventfd since we didn't actually
654         // read the message off of the queue.
655         queue_->signalEvent(1);
656       }
657
658       return;
659     }
660   }
661 }
662
663 template<typename MessageT>
664 void NotificationQueue<MessageT>::Consumer::init(
665     EventBase* eventBase,
666     NotificationQueue* queue) {
667   assert(eventBase->isInEventBaseThread());
668   assert(queue_ == nullptr);
669   assert(!isHandlerRegistered());
670   queue->checkPid();
671
672   base_ = eventBase;
673
674   queue_ = queue;
675
676   {
677     folly::MSLGuard g(queue_->spinlock_);
678     queue_->numConsumers_++;
679   }
680   queue_->signalEvent();
681
682   if (queue_->eventfd_ >= 0) {
683     initHandler(eventBase, queue_->eventfd_);
684   } else {
685     initHandler(eventBase, queue_->pipeFds_[0]);
686   }
687 }
688
689 template<typename MessageT>
690 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
691   if (queue_ == nullptr) {
692     assert(!isHandlerRegistered());
693     return;
694   }
695
696   {
697     folly::MSLGuard g(queue_->spinlock_);
698     queue_->numConsumers_--;
699     setActive(false);
700   }
701
702   assert(isHandlerRegistered());
703   unregisterHandler();
704   detachEventBase();
705   queue_ = nullptr;
706 }
707
708 } // folly