2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
24 #include "folly/io/async/EventBase.h"
25 #include "folly/io/async/EventFDWrapper.h"
26 #include "folly/io/async/EventHandler.h"
27 #include "folly/io/async/Request.h"
28 #include "folly/Likely.h"
29 #include "folly/SmallLocks.h"
31 #include <glog/logging.h>
37 * A producer-consumer queue for passing messages between EventBase threads.
39 * Messages can be added to the queue from any thread. Multiple consumers may
40 * listen to the queue from multiple EventBase threads.
42 * A NotificationQueue may not be destroyed while there are still consumers
43 * registered to receive events from the queue. It is the user's
44 * responsibility to ensure that all consumers are unregistered before the
47 * MessageT should be MoveConstructible (i.e., must support either a move
48 * constructor or a copy constructor, or both). Ideally it's move constructor
49 * (or copy constructor if no move constructor is provided) should never throw
50 * exceptions. If the constructor may throw, the consumers could end up
51 * spinning trying to move a message off the queue and failing, and then
54 template<typename MessageT>
55 class NotificationQueue {
58 * A callback interface for consuming messages from the queue as they arrive.
60 class Consumer : private EventHandler {
62 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
66 destroyedFlagPtr_(nullptr),
67 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
72 * messageAvailable() will be invoked whenever a new
73 * message is available from the pipe.
75 virtual void messageAvailable(MessageT&& message) = 0;
78 * Begin consuming messages from the specified queue.
80 * messageAvailable() will be called whenever a message is available. This
81 * consumer will continue to consume messages until stopConsuming() is
84 * A Consumer may only consume messages from a single NotificationQueue at
85 * a time. startConsuming() should not be called if this consumer is
88 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
89 init(eventBase, queue);
90 registerHandler(READ | PERSIST);
94 * Same as above but registers this event handler as internal so that it
95 * doesn't count towards the pending reader count for the IOLoop.
97 void startConsumingInternal(
98 EventBase* eventBase, NotificationQueue* queue) {
99 init(eventBase, queue);
100 registerInternalHandler(READ | PERSIST);
104 * Stop consuming messages.
106 * startConsuming() may be called again to resume consumption of messages
107 * at a later point in time.
109 void stopConsuming();
112 * Get the NotificationQueue that this consumer is currently consuming
113 * messages from. Returns nullptr if the consumer is not currently
114 * consuming events from any queue.
116 NotificationQueue* getCurrentQueue() const {
121 * Set a limit on how many messages this consumer will read each iteration
122 * around the event loop.
124 * This helps rate-limit how much work the Consumer will do each event loop
125 * iteration, to prevent it from starving other event handlers.
127 * A limit of 0 means no limit will be enforced. If unset, the limit
128 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
130 void setMaxReadAtOnce(uint32_t maxAtOnce) {
131 maxReadAtOnce_ = maxAtOnce;
133 uint32_t getMaxReadAtOnce() const {
134 return maxReadAtOnce_;
137 EventBase* getEventBase() {
141 virtual void handlerReady(uint16_t events) noexcept;
144 void init(EventBase* eventBase, NotificationQueue* queue);
146 NotificationQueue* queue_;
147 bool* destroyedFlagPtr_;
148 uint32_t maxReadAtOnce_;
158 * Create a new NotificationQueue.
160 * If the maxSize parameter is specified, this sets the maximum queue size
161 * that will be enforced by tryPutMessage(). (This size is advisory, and may
162 * be exceeded if producers explicitly use putMessage() instead of
165 * The fdType parameter determines the type of file descriptor used
166 * internally to signal message availability. The default (eventfd) is
167 * preferable for performance and because it won't fail when the queue gets
168 * too long. It is not available on on older and non-linux kernels, however.
169 * In this case the code will fall back to using a pipe, the parameter is
170 * mostly for testing purposes.
172 explicit NotificationQueue(uint32_t maxSize = 0,
173 FdType fdType = FdType::EVENTFD)
176 advisoryMaxQueueSize_(maxSize),
182 RequestContext::getStaticContext();
184 if (fdType == FdType::EVENTFD) {
185 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
186 if (eventfd_ == -1) {
187 if (errno == ENOSYS || errno == EINVAL) {
188 // eventfd not availalble
189 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
190 << errno << ", falling back to pipe mode (is your kernel "
192 fdType = FdType::PIPE;
195 folly::throwSystemError("Failed to create eventfd for "
196 "NotificationQueue", errno);
200 if (fdType == FdType::PIPE) {
201 if (pipe(pipeFds_)) {
202 folly::throwSystemError("Failed to create pipe for NotificationQueue",
206 // put both ends of the pipe into non-blocking mode
207 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
208 folly::throwSystemError("failed to put NotificationQueue pipe read "
209 "endpoint into non-blocking mode", errno);
211 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
212 folly::throwSystemError("failed to put NotificationQueue pipe write "
213 "endpoint into non-blocking mode", errno);
216 ::close(pipeFds_[0]);
217 ::close(pipeFds_[1]);
223 ~NotificationQueue() {
228 if (pipeFds_[0] >= 0) {
229 ::close(pipeFds_[0]);
232 if (pipeFds_[1] >= 0) {
233 ::close(pipeFds_[1]);
239 * Set the advisory maximum queue size.
241 * This maximum queue size affects calls to tryPutMessage(). Message
242 * producers can still use the putMessage() call to unconditionally put a
243 * message on the queue, ignoring the configured maximum queue size. This
244 * can cause the queue size to exceed the configured maximum.
246 void setMaxQueueSize(uint32_t max) {
247 advisoryMaxQueueSize_ = max;
251 * Attempt to put a message on the queue if the queue is not already full.
253 * If the queue is full, a std::overflow_error will be thrown. The
254 * setMaxQueueSize() function controls the maximum queue size.
256 * This method may contend briefly on a spinlock if many threads are
257 * concurrently accessing the queue, but for all intents and purposes it will
258 * immediately place the message on the queue and return.
260 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
261 * may throw any other exception thrown by the MessageT move/copy
264 void tryPutMessage(MessageT&& message) {
265 putMessageImpl(std::move(message), advisoryMaxQueueSize_);
267 void tryPutMessage(const MessageT& message) {
268 putMessageImpl(message, advisoryMaxQueueSize_);
272 * No-throw versions of the above. Instead returns true on success, false on
275 * Only std::overflow_error is prevented from being thrown (since this is the
276 * common exception case), user code must still catch std::bad_alloc errors.
278 bool tryPutMessageNoThrow(MessageT&& message) {
279 return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
281 bool tryPutMessageNoThrow(const MessageT& message) {
282 return putMessageImpl(message, advisoryMaxQueueSize_, false);
286 * Unconditionally put a message on the queue.
288 * This method is like tryPutMessage(), but ignores the maximum queue size
289 * and always puts the message on the queue, even if the maximum queue size
292 * putMessage() may throw std::bad_alloc if memory allocation fails, and may
293 * throw any other exception thrown by the MessageT move/copy constructor.
295 void putMessage(MessageT&& message) {
296 putMessageImpl(std::move(message), 0);
298 void putMessage(const MessageT& message) {
299 putMessageImpl(message, 0);
303 * Put several messages on the queue.
305 template<typename InputIteratorT>
306 void putMessages(InputIteratorT first, InputIteratorT last) {
307 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
309 putMessagesImpl(first, last, IterCategory());
313 * Try to immediately pull a message off of the queue, without blocking.
315 * If a message is immediately available, the result parameter will be
316 * updated to contain the message contents and true will be returned.
318 * If no message is available, false will be returned and result will be left
321 bool tryConsume(MessageT& result) {
323 if (!tryConsumeEvent()) {
329 folly::MSLGuard g(spinlock_);
331 // This shouldn't happen normally. See the comments in
332 // Consumer::handlerReady() for more details.
333 if (UNLIKELY(queue_.empty())) {
334 LOG(ERROR) << "found empty queue after signalled event";
338 auto data = std::move(queue_.front());
340 RequestContext::setContext(data.second);
344 // Handle an exception if the assignment operator happens to throw.
345 // We consumed an event but weren't able to pop the message off the
346 // queue. Signal the event again since the message is still in the
356 folly::MSLGuard g(spinlock_);
357 return queue_.size();
361 * Check that the NotificationQueue is being used from the correct process.
363 * If you create a NotificationQueue in one process, then fork, and try to
364 * send messages to the queue from the child process, you're going to have a
365 * bad time. Unfortunately users have (accidentally) run into this.
367 * Because we use an eventfd/pipe, the child process can actually signal the
368 * parent process that an event is ready. However, it can't put anything on
369 * the parent's queue, so the parent wakes up and finds an empty queue. This
370 * check ensures that we catch the problem in the misbehaving child process
371 * code, and crash before signalling the parent process.
373 void checkPid() const {
374 CHECK_EQ(pid_, getpid());
378 // Forbidden copy constructor and assignment operator
379 NotificationQueue(NotificationQueue const &) = delete;
380 NotificationQueue& operator=(NotificationQueue const &) = delete;
382 inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
383 DCHECK(0 == spinlock_.try_lock());
384 if (maxSize > 0 && queue_.size() >= maxSize) {
386 throw std::overflow_error("unable to add message to NotificationQueue: "
394 inline void signalEvent(size_t numAdded = 1) const {
395 static const uint8_t kPipeMessage[] = {
396 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
399 ssize_t bytes_written = 0;
400 ssize_t bytes_expected = 0;
402 // eventfd(2) dictates that we must write a 64-bit integer
403 uint64_t numAdded64(numAdded);
404 bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
405 bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
407 // pipe semantics, add one message for each numAdded
408 bytes_expected = numAdded;
410 size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
411 ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
413 // TODO: if the pipe is full, write will fail with EAGAIN.
414 // See task #1044651 for how this could be handled
419 } while (numAdded > 0);
421 if (bytes_written != bytes_expected) {
422 folly::throwSystemError("failed to signal NotificationQueue after "
427 bool tryConsumeEvent() {
431 rc = ::read(eventfd_, &value, sizeof(value));
434 rc = ::read(pipeFds_[0], &value8, sizeof(value8));
438 // EAGAIN should pretty much be the only error we can ever get.
439 // This means someone else already processed the only available message.
440 assert(errno == EAGAIN);
447 bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
450 folly::MSLGuard g(spinlock_);
451 if (!checkQueueSize(maxSize, throws)) {
455 std::make_pair(std::move(message),
456 RequestContext::saveContext()));
463 const MessageT& message, size_t maxSize, bool throws=true) {
466 folly::MSLGuard g(spinlock_);
467 if (!checkQueueSize(maxSize, throws)) {
470 queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
476 template<typename InputIteratorT>
477 void putMessagesImpl(InputIteratorT first, InputIteratorT last,
478 std::input_iterator_tag) {
482 folly::MSLGuard g(spinlock_);
483 while (first != last) {
484 queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
489 signalEvent(numAdded);
492 mutable folly::MicroSpinLock spinlock_;
494 int pipeFds_[2]; // to fallback to on older/non-linux systems
495 uint32_t advisoryMaxQueueSize_;
497 std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
500 template<typename MessageT>
501 NotificationQueue<MessageT>::Consumer::~Consumer() {
502 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
503 // will be non-nullptr. Mark the value that it points to, so that
504 // handlerReady() will know the callback is destroyed, and that it cannot
505 // access any member variables anymore.
506 if (destroyedFlagPtr_) {
507 *destroyedFlagPtr_ = true;
511 template<typename MessageT>
512 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
514 uint32_t numProcessed = 0;
516 // Try to decrement the eventfd.
518 // We decrement the eventfd before checking the queue, and only pop a
519 // message off the queue if we read from the eventfd.
521 // Reading the eventfd first allows us to not have to hold the spinlock
522 // while accessing the eventfd. If we popped from the queue first, we
523 // would have to hold the lock while reading from or writing to the
524 // eventfd. (Multiple consumers may be woken up from a single eventfd
525 // notification. If we popped from the queue first, we could end up
526 // popping a message from the queue before the eventfd has been notified by
527 // the producer, unless the consumer and producer both held the spinlock
528 // around the entire operation.)
529 if (!queue_->tryConsumeEvent()) {
530 // no message available right now
534 // Now pop the message off of the queue.
535 // We successfully consumed the eventfd notification.
536 // There should be a message available for us to consume.
538 // We have to manually acquire and release the spinlock here, rather than
539 // using SpinLockHolder since the MessageT has to be constructed while
540 // holding the spinlock and available after we release it. SpinLockHolder
541 // unfortunately doesn't provide a release() method. (We can't construct
542 // MessageT first since we have no guarantee that MessageT has a default
544 queue_->spinlock_.lock();
548 // The eventfd is incremented once for every message, and only
549 // decremented when a message is popped off. There should always be a
550 // message here to read.
551 if (UNLIKELY(queue_->queue_.empty())) {
552 // Unfortunately we have seen this happen in practice if a user forks
553 // the process, and then the child tries to send a message to a
554 // NotificationQueue being monitored by a thread in the parent.
555 // The child can signal the parent via the eventfd, but won't have been
556 // able to put anything on the parent's queue since it has a separate
559 // This is a bug in the sender's code. putMessagesImpl() should cause
560 // the sender to crash now before trying to send a message from the
561 // wrong process. However, just in case let's handle this case in the
562 // consumer without crashing.
563 LOG(ERROR) << "found empty queue after signalled event";
564 queue_->spinlock_.unlock();
568 // Pull a message off the queue.
569 auto& data = queue_->queue_.front();
571 MessageT msg(std::move(data.first));
573 RequestContext::setContext(data.second);
574 queue_->queue_.pop_front();
576 // Check to see if the queue is empty now.
577 // We use this as an optimization to see if we should bother trying to
578 // loop again and read another message after invoking this callback.
579 bool wasEmpty = queue_->queue_.empty();
581 // Now unlock the spinlock before we invoke the callback.
582 queue_->spinlock_.unlock();
586 bool callbackDestroyed = false;
587 CHECK(destroyedFlagPtr_ == nullptr);
588 destroyedFlagPtr_ = &callbackDestroyed;
589 messageAvailable(std::move(msg));
591 RequestContext::setContext(old_ctx);
593 // If the callback was destroyed before it returned, we are done
594 if (callbackDestroyed) {
597 destroyedFlagPtr_ = nullptr;
599 // If the callback is no longer installed, we are done.
600 if (queue_ == nullptr) {
604 // If we have hit maxReadAtOnce_, we are done.
606 if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
610 // If the queue was empty before we invoked the callback, it's probable
611 // that it is still empty now. Just go ahead and return, rather than
612 // looping again and trying to re-read from the eventfd. (If a new
613 // message had in fact arrived while we were invoking the callback, we
614 // will simply be woken up the next time around the event loop and will
615 // process the message then.)
619 } catch (const std::exception& ex) {
620 // This catch block is really just to handle the case where the MessageT
621 // constructor throws. The messageAvailable() callback itself is
622 // declared as noexcept and should never throw.
624 // If the MessageT constructor does throw we try to handle it as best as
625 // we can, but we can't work miracles. We will just ignore the error for
626 // now and return. The next time around the event loop we will end up
627 // trying to read the message again. If MessageT continues to throw we
628 // will never make forward progress and will keep trying each time around
631 // Unlock the spinlock.
632 queue_->spinlock_.unlock();
634 // Push a notification back on the eventfd since we didn't actually
635 // read the message off of the queue.
636 queue_->signalEvent(1);
644 template<typename MessageT>
645 void NotificationQueue<MessageT>::Consumer::init(
646 EventBase* eventBase,
647 NotificationQueue* queue) {
648 assert(eventBase->isInEventBaseThread());
649 assert(queue_ == nullptr);
650 assert(!isHandlerRegistered());
656 if (queue_->eventfd_ >= 0) {
657 initHandler(eventBase, queue_->eventfd_);
659 initHandler(eventBase, queue_->pipeFds_[0]);
663 template<typename MessageT>
664 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
665 if (queue_ == nullptr) {
666 assert(!isHandlerRegistered());
670 assert(isHandlerRegistered());