2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventFDWrapper.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/io/async/Request.h>
26 #include <folly/Likely.h>
27 #include <folly/SmallLocks.h>
28 #include <folly/ScopeGuard.h>
30 #include <glog/logging.h>
36 * A producer-consumer queue for passing messages between EventBase threads.
38 * Messages can be added to the queue from any thread. Multiple consumers may
39 * listen to the queue from multiple EventBase threads.
41 * A NotificationQueue may not be destroyed while there are still consumers
42 * registered to receive events from the queue. It is the user's
43 * responsibility to ensure that all consumers are unregistered before the
46 * MessageT should be MoveConstructible (i.e., must support either a move
47 * constructor or a copy constructor, or both). Ideally it's move constructor
48 * (or copy constructor if no move constructor is provided) should never throw
49 * exceptions. If the constructor may throw, the consumers could end up
50 * spinning trying to move a message off the queue and failing, and then
53 template<typename MessageT>
54 class NotificationQueue {
57 * A callback interface for consuming messages from the queue as they arrive.
59 class Consumer : private EventHandler {
61 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
65 destroyedFlagPtr_(nullptr),
66 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
71 * messageAvailable() will be invoked whenever a new
72 * message is available from the pipe.
74 virtual void messageAvailable(MessageT&& message) = 0;
77 * Begin consuming messages from the specified queue.
79 * messageAvailable() will be called whenever a message is available. This
80 * consumer will continue to consume messages until stopConsuming() is
83 * A Consumer may only consume messages from a single NotificationQueue at
84 * a time. startConsuming() should not be called if this consumer is
87 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
88 init(eventBase, queue);
89 registerHandler(READ | PERSIST);
93 * Same as above but registers this event handler as internal so that it
94 * doesn't count towards the pending reader count for the IOLoop.
96 void startConsumingInternal(
97 EventBase* eventBase, NotificationQueue* queue) {
98 init(eventBase, queue);
99 registerInternalHandler(READ | PERSIST);
103 * Stop consuming messages.
105 * startConsuming() may be called again to resume consumption of messages
106 * at a later point in time.
108 void stopConsuming();
111 * Get the NotificationQueue that this consumer is currently consuming
112 * messages from. Returns nullptr if the consumer is not currently
113 * consuming events from any queue.
115 NotificationQueue* getCurrentQueue() const {
120 * Set a limit on how many messages this consumer will read each iteration
121 * around the event loop.
123 * This helps rate-limit how much work the Consumer will do each event loop
124 * iteration, to prevent it from starving other event handlers.
126 * A limit of 0 means no limit will be enforced. If unset, the limit
127 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
129 void setMaxReadAtOnce(uint32_t maxAtOnce) {
130 maxReadAtOnce_ = maxAtOnce;
132 uint32_t getMaxReadAtOnce() const {
133 return maxReadAtOnce_;
136 EventBase* getEventBase() {
140 virtual void handlerReady(uint16_t events) noexcept;
144 void setActive(bool active, bool shouldLock = false) {
150 queue_->spinlock_.lock();
152 if (!active_ && active) {
153 ++queue_->numActiveConsumers_;
154 } else if (active_ && !active) {
155 --queue_->numActiveConsumers_;
159 queue_->spinlock_.unlock();
162 void init(EventBase* eventBase, NotificationQueue* queue);
164 NotificationQueue* queue_;
165 bool* destroyedFlagPtr_;
166 uint32_t maxReadAtOnce_;
177 * Create a new NotificationQueue.
179 * If the maxSize parameter is specified, this sets the maximum queue size
180 * that will be enforced by tryPutMessage(). (This size is advisory, and may
181 * be exceeded if producers explicitly use putMessage() instead of
184 * The fdType parameter determines the type of file descriptor used
185 * internally to signal message availability. The default (eventfd) is
186 * preferable for performance and because it won't fail when the queue gets
187 * too long. It is not available on on older and non-linux kernels, however.
188 * In this case the code will fall back to using a pipe, the parameter is
189 * mostly for testing purposes.
191 explicit NotificationQueue(uint32_t maxSize = 0,
192 FdType fdType = FdType::EVENTFD)
195 advisoryMaxQueueSize_(maxSize),
201 RequestContext::getStaticContext();
203 if (fdType == FdType::EVENTFD) {
204 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
205 if (eventfd_ == -1) {
206 if (errno == ENOSYS || errno == EINVAL) {
207 // eventfd not availalble
208 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
209 << errno << ", falling back to pipe mode (is your kernel "
211 fdType = FdType::PIPE;
214 folly::throwSystemError("Failed to create eventfd for "
215 "NotificationQueue", errno);
219 if (fdType == FdType::PIPE) {
220 if (pipe(pipeFds_)) {
221 folly::throwSystemError("Failed to create pipe for NotificationQueue",
225 // put both ends of the pipe into non-blocking mode
226 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
227 folly::throwSystemError("failed to put NotificationQueue pipe read "
228 "endpoint into non-blocking mode", errno);
230 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
231 folly::throwSystemError("failed to put NotificationQueue pipe write "
232 "endpoint into non-blocking mode", errno);
235 ::close(pipeFds_[0]);
236 ::close(pipeFds_[1]);
242 ~NotificationQueue() {
247 if (pipeFds_[0] >= 0) {
248 ::close(pipeFds_[0]);
251 if (pipeFds_[1] >= 0) {
252 ::close(pipeFds_[1]);
258 * Set the advisory maximum queue size.
260 * This maximum queue size affects calls to tryPutMessage(). Message
261 * producers can still use the putMessage() call to unconditionally put a
262 * message on the queue, ignoring the configured maximum queue size. This
263 * can cause the queue size to exceed the configured maximum.
265 void setMaxQueueSize(uint32_t max) {
266 advisoryMaxQueueSize_ = max;
270 * Attempt to put a message on the queue if the queue is not already full.
272 * If the queue is full, a std::overflow_error will be thrown. The
273 * setMaxQueueSize() function controls the maximum queue size.
275 * This method may contend briefly on a spinlock if many threads are
276 * concurrently accessing the queue, but for all intents and purposes it will
277 * immediately place the message on the queue and return.
279 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
280 * may throw any other exception thrown by the MessageT move/copy
283 void tryPutMessage(MessageT&& message) {
284 putMessageImpl(std::move(message), advisoryMaxQueueSize_);
286 void tryPutMessage(const MessageT& message) {
287 putMessageImpl(message, advisoryMaxQueueSize_);
291 * No-throw versions of the above. Instead returns true on success, false on
294 * Only std::overflow_error is prevented from being thrown (since this is the
295 * common exception case), user code must still catch std::bad_alloc errors.
297 bool tryPutMessageNoThrow(MessageT&& message) {
298 return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
300 bool tryPutMessageNoThrow(const MessageT& message) {
301 return putMessageImpl(message, advisoryMaxQueueSize_, false);
305 * Unconditionally put a message on the queue.
307 * This method is like tryPutMessage(), but ignores the maximum queue size
308 * and always puts the message on the queue, even if the maximum queue size
311 * putMessage() may throw std::bad_alloc if memory allocation fails, and may
312 * throw any other exception thrown by the MessageT move/copy constructor.
314 void putMessage(MessageT&& message) {
315 putMessageImpl(std::move(message), 0);
317 void putMessage(const MessageT& message) {
318 putMessageImpl(message, 0);
322 * Put several messages on the queue.
324 template<typename InputIteratorT>
325 void putMessages(InputIteratorT first, InputIteratorT last) {
326 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
328 putMessagesImpl(first, last, IterCategory());
332 * Try to immediately pull a message off of the queue, without blocking.
334 * If a message is immediately available, the result parameter will be
335 * updated to contain the message contents and true will be returned.
337 * If no message is available, false will be returned and result will be left
340 bool tryConsume(MessageT& result) {
345 folly::MSLGuard g(spinlock_);
347 if (UNLIKELY(queue_.empty())) {
351 auto data = std::move(queue_.front());
353 RequestContext::setContext(data.second);
357 // Handle an exception if the assignment operator happens to throw.
358 // We consumed an event but weren't able to pop the message off the
359 // queue. Signal the event again since the message is still in the
369 folly::MSLGuard g(spinlock_);
370 return queue_.size();
374 * Check that the NotificationQueue is being used from the correct process.
376 * If you create a NotificationQueue in one process, then fork, and try to
377 * send messages to the queue from the child process, you're going to have a
378 * bad time. Unfortunately users have (accidentally) run into this.
380 * Because we use an eventfd/pipe, the child process can actually signal the
381 * parent process that an event is ready. However, it can't put anything on
382 * the parent's queue, so the parent wakes up and finds an empty queue. This
383 * check ensures that we catch the problem in the misbehaving child process
384 * code, and crash before signalling the parent process.
386 void checkPid() const {
387 CHECK_EQ(pid_, getpid());
391 // Forbidden copy constructor and assignment operator
392 NotificationQueue(NotificationQueue const &) = delete;
393 NotificationQueue& operator=(NotificationQueue const &) = delete;
395 inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
396 DCHECK(0 == spinlock_.try_lock());
397 if (maxSize > 0 && queue_.size() >= maxSize) {
399 throw std::overflow_error("unable to add message to NotificationQueue: "
407 inline void signalEvent(size_t numAdded = 1) const {
408 static const uint8_t kPipeMessage[] = {
409 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
412 ssize_t bytes_written = 0;
413 ssize_t bytes_expected = 0;
415 // eventfd(2) dictates that we must write a 64-bit integer
416 uint64_t numAdded64(numAdded);
417 bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
418 bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
420 // pipe semantics, add one message for each numAdded
421 bytes_expected = numAdded;
423 size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
424 ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
426 // TODO: if the pipe is full, write will fail with EAGAIN.
427 // See task #1044651 for how this could be handled
432 } while (numAdded > 0);
434 if (bytes_written != bytes_expected) {
435 folly::throwSystemError("failed to signal NotificationQueue after "
440 bool tryConsumeEvent() {
444 rc = ::read(eventfd_, &value, sizeof(value));
447 rc = ::read(pipeFds_[0], &value8, sizeof(value8));
451 // EAGAIN should pretty much be the only error we can ever get.
452 // This means someone else already processed the only available message.
453 assert(errno == EAGAIN);
460 bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
464 folly::MSLGuard g(spinlock_);
465 if (!checkQueueSize(maxSize, throws)) {
468 // We only need to signal an event if not all consumers are
470 if (numActiveConsumers_ < numConsumers_) {
474 std::make_pair(std::move(message),
475 RequestContext::saveContext()));
484 const MessageT& message, size_t maxSize, bool throws=true) {
488 folly::MSLGuard g(spinlock_);
489 if (!checkQueueSize(maxSize, throws)) {
492 if (numActiveConsumers_ < numConsumers_) {
495 queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
503 template<typename InputIteratorT>
504 void putMessagesImpl(InputIteratorT first, InputIteratorT last,
505 std::input_iterator_tag) {
510 folly::MSLGuard g(spinlock_);
511 while (first != last) {
512 queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
516 if (numActiveConsumers_ < numConsumers_) {
525 mutable folly::MicroSpinLock spinlock_;
527 int pipeFds_[2]; // to fallback to on older/non-linux systems
528 uint32_t advisoryMaxQueueSize_;
530 std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
531 int numConsumers_{0};
532 std::atomic<int> numActiveConsumers_{0};
535 template<typename MessageT>
536 NotificationQueue<MessageT>::Consumer::~Consumer() {
537 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
538 // will be non-nullptr. Mark the value that it points to, so that
539 // handlerReady() will know the callback is destroyed, and that it cannot
540 // access any member variables anymore.
541 if (destroyedFlagPtr_) {
542 *destroyedFlagPtr_ = true;
546 template<typename MessageT>
547 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
549 uint32_t numProcessed = 0;
550 bool firstRun = true;
552 SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
554 // Try to decrement the eventfd.
556 // The eventfd is only used to wake up the consumer - there may or
557 // may not actually be an event available (another consumer may
558 // have read it). We don't really care, we only care about
559 // emptying the queue.
561 queue_->tryConsumeEvent();
565 // Now pop the message off of the queue.
567 // We have to manually acquire and release the spinlock here, rather than
568 // using SpinLockHolder since the MessageT has to be constructed while
569 // holding the spinlock and available after we release it. SpinLockHolder
570 // unfortunately doesn't provide a release() method. (We can't construct
571 // MessageT first since we have no guarantee that MessageT has a default
573 queue_->spinlock_.lock();
577 if (UNLIKELY(queue_->queue_.empty())) {
578 // If there is no message, we've reached the end of the queue, return.
579 queue_->spinlock_.unlock();
583 // Pull a message off the queue.
584 auto& data = queue_->queue_.front();
586 MessageT msg(std::move(data.first));
588 RequestContext::setContext(data.second);
589 queue_->queue_.pop_front();
591 // Check to see if the queue is empty now.
592 // We use this as an optimization to see if we should bother trying to
593 // loop again and read another message after invoking this callback.
594 bool wasEmpty = queue_->queue_.empty();
599 // Now unlock the spinlock before we invoke the callback.
600 queue_->spinlock_.unlock();
604 bool callbackDestroyed = false;
605 CHECK(destroyedFlagPtr_ == nullptr);
606 destroyedFlagPtr_ = &callbackDestroyed;
607 messageAvailable(std::move(msg));
609 RequestContext::setContext(old_ctx);
611 // If the callback was destroyed before it returned, we are done
612 if (callbackDestroyed) {
615 destroyedFlagPtr_ = nullptr;
617 // If the callback is no longer installed, we are done.
618 if (queue_ == nullptr) {
622 // If we have hit maxReadAtOnce_, we are done.
624 if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
625 queue_->signalEvent(1);
629 // If the queue was empty before we invoked the callback, it's probable
630 // that it is still empty now. Just go ahead and return, rather than
631 // looping again and trying to re-read from the eventfd. (If a new
632 // message had in fact arrived while we were invoking the callback, we
633 // will simply be woken up the next time around the event loop and will
634 // process the message then.)
638 } catch (const std::exception& ex) {
639 // This catch block is really just to handle the case where the MessageT
640 // constructor throws. The messageAvailable() callback itself is
641 // declared as noexcept and should never throw.
643 // If the MessageT constructor does throw we try to handle it as best as
644 // we can, but we can't work miracles. We will just ignore the error for
645 // now and return. The next time around the event loop we will end up
646 // trying to read the message again. If MessageT continues to throw we
647 // will never make forward progress and will keep trying each time around
650 // Unlock the spinlock.
651 queue_->spinlock_.unlock();
653 // Push a notification back on the eventfd since we didn't actually
654 // read the message off of the queue.
655 queue_->signalEvent(1);
663 template<typename MessageT>
664 void NotificationQueue<MessageT>::Consumer::init(
665 EventBase* eventBase,
666 NotificationQueue* queue) {
667 assert(eventBase->isInEventBaseThread());
668 assert(queue_ == nullptr);
669 assert(!isHandlerRegistered());
677 folly::MSLGuard g(queue_->spinlock_);
678 queue_->numConsumers_++;
680 queue_->signalEvent();
682 if (queue_->eventfd_ >= 0) {
683 initHandler(eventBase, queue_->eventfd_);
685 initHandler(eventBase, queue_->pipeFds_[0]);
689 template<typename MessageT>
690 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
691 if (queue_ == nullptr) {
692 assert(!isHandlerRegistered());
697 folly::MSLGuard g(queue_->spinlock_);
698 queue_->numConsumers_--;
702 assert(isHandlerRegistered());