X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FNotificationQueue.h;h=fa66baea7c9a80d1ddaa642496fe932143ea32ea;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hp=6bdf16fae361fdca48467fb2e0306ffdeaa35e28;hpb=4a03236d4bd2084f957be74ea3f61f2ae531a496;p=folly.git diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index 6bdf16fa..fa66baea 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,29 @@ #pragma once -#include -#include +#include -#include -#include -#include +#include +#include +#include +#include +#include +#include + +#include +#include #include #include #include +#include +#include +#include +#include +#include +#include +#include #include -#include #if __linux__ && !__ANDROID__ #define FOLLY_HAVE_EVENTFD @@ -54,13 +65,13 @@ namespace folly { * spinning trying to move a message off the queue and failing, and then * retrying. */ -template +template class NotificationQueue { public: /** * A callback interface for consuming messages from the queue as they arrive. */ - class Consumer : private EventHandler { + class Consumer : public DelayedDestruction, private EventHandler { public: enum : uint16_t { kDefaultMaxReadAtOnce = 10 }; @@ -69,13 +80,16 @@ class NotificationQueue { destroyedFlagPtr_(nullptr), maxReadAtOnce_(kDefaultMaxReadAtOnce) {} - virtual ~Consumer(); + // create a consumer in-place, without the need to build new class + template + static std::unique_ptr make( + TCallback&& callback); /** * messageAvailable() will be invoked whenever a new * message is available from the pipe. */ - virtual void messageAvailable(MessageT&& message) = 0; + virtual void messageAvailable(MessageT&& message) noexcept = 0; /** * Begin consuming messages from the specified queue. @@ -120,7 +134,7 @@ class NotificationQueue { * @returns true if the queue was drained, false otherwise. In practice, * this will only fail if someone else is already draining the queue. */ - bool consumeUntilDrained() noexcept; + bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept; /** * Get the NotificationQueue that this consumer is currently consuming @@ -152,7 +166,13 @@ class NotificationQueue { return base_; } - virtual void handlerReady(uint16_t events) noexcept; + void handlerReady(uint16_t events) noexcept override; + + protected: + + void destroy() override; + + ~Consumer() override {} private: /** @@ -165,7 +185,7 @@ class NotificationQueue { * * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation. */ - void consumeMessages(bool isDrain) noexcept; + void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept; void setActive(bool active, bool shouldLock = false) { if (!queue_) { @@ -194,6 +214,24 @@ class NotificationQueue { bool active_{false}; }; + class SimpleConsumer { + public: + explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) { + ++queue_.numConsumers_; + } + + ~SimpleConsumer() { + --queue_.numConsumers_; + } + + int getFd() const { + return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0]; + } + + private: + NotificationQueue& queue_; + }; + enum class FdType { PIPE, #ifdef FOLLY_HAVE_EVENTFD @@ -222,17 +260,15 @@ class NotificationQueue { #else FdType fdType = FdType::PIPE) #endif - : eventfd_(-1), - pipeFds_{-1, -1}, - advisoryMaxQueueSize_(maxSize), - pid_(getpid()), - queue_() { - - RequestContext::saveContext(); + : eventfd_(-1), + pipeFds_{-1, -1}, + advisoryMaxQueueSize_(maxSize), + pid_(pid_t(getpid())), + queue_() { #ifdef FOLLY_HAVE_EVENTFD if (fdType == FdType::EVENTFD) { - eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE); + eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventfd_ == -1) { if (errno == ENOSYS || errno == EINVAL) { // eventfd not availalble @@ -314,11 +350,9 @@ class NotificationQueue { * may throw any other exception thrown by the MessageT move/copy * constructor. */ - void tryPutMessage(MessageT&& message) { - putMessageImpl(std::move(message), advisoryMaxQueueSize_); - } - void tryPutMessage(const MessageT& message) { - putMessageImpl(message, advisoryMaxQueueSize_); + template + void tryPutMessage(MessageTT&& message) { + putMessageImpl(std::forward(message), advisoryMaxQueueSize_); } /** @@ -329,11 +363,10 @@ class NotificationQueue { * (which indicates that the queue is being drained) are prevented from being * thrown. User code must still catch std::bad_alloc errors. */ - bool tryPutMessageNoThrow(MessageT&& message) { - return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false); - } - bool tryPutMessageNoThrow(const MessageT& message) { - return putMessageImpl(message, advisoryMaxQueueSize_, false); + template + bool tryPutMessageNoThrow(MessageTT&& message) { + return putMessageImpl( + std::forward(message), advisoryMaxQueueSize_, false); } /** @@ -348,17 +381,15 @@ class NotificationQueue { * - std::runtime_error if the queue is currently draining * - any other exception thrown by the MessageT move/copy constructor. */ - void putMessage(MessageT&& message) { - putMessageImpl(std::move(message), 0); - } - void putMessage(const MessageT& message) { - putMessageImpl(message, 0); + template + void putMessage(MessageTT&& message) { + putMessageImpl(std::forward(message), 0); } /** * Put several messages on the queue. */ - template + template void putMessages(InputIteratorT first, InputIteratorT last) { typedef typename std::iterator_traits::iterator_category IterCategory; @@ -375,34 +406,26 @@ class NotificationQueue { * unmodified. */ bool tryConsume(MessageT& result) { + SCOPE_EXIT { syncSignalAndQueue(); }; + checkPid(); - try { + folly::SpinLockGuard g(spinlock_); - folly::SpinLockGuard g(spinlock_); + if (UNLIKELY(queue_.empty())) { + return false; + } - if (UNLIKELY(queue_.empty())) { - return false; - } + auto& data = queue_.front(); + result = std::move(data.first); + RequestContext::setContext(std::move(data.second)); - auto data = std::move(queue_.front()); - result = data.first; - RequestContext::setContext(data.second); - - queue_.pop_front(); - } catch (...) { - // Handle an exception if the assignment operator happens to throw. - // We consumed an event but weren't able to pop the message off the - // queue. Signal the event again since the message is still in the - // queue. - signalEvent(1); - throw; - } + queue_.pop_front(); return true; } - int size() { + size_t size() const { folly::SpinLockGuard g(spinlock_); return queue_.size(); } @@ -420,9 +443,7 @@ class NotificationQueue { * check ensures that we catch the problem in the misbehaving child process * code, and crash before signalling the parent process. */ - void checkPid() const { - CHECK_EQ(pid_, getpid()); - } + void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); } private: // Forbidden copy constructor and assignment operator @@ -430,7 +451,7 @@ class NotificationQueue { NotificationQueue& operator=(NotificationQueue const &) = delete; inline bool checkQueueSize(size_t maxSize, bool throws=true) const { - DCHECK(0 == spinlock_.trylock()); + DCHECK(0 == spinlock_.try_lock()); if (maxSize > 0 && queue_.size() >= maxSize) { if (throws) { throw std::overflow_error("unable to add message to NotificationQueue: " @@ -448,82 +469,102 @@ class NotificationQueue { return draining_; } - inline void signalEvent(size_t numAdded = 1) const { - static const uint8_t kPipeMessage[] = { - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 - }; +#ifdef __ANDROID__ + // TODO 10860938 Remove after figuring out crash + mutable std::atomic eventBytes_{0}; + mutable std::atomic maxEventBytes_{0}; +#endif + + void ensureSignalLocked() const { + // semantics: empty fd == empty queue <=> !signal_ + if (signal_) { + return; + } ssize_t bytes_written = 0; - ssize_t bytes_expected = 0; - if (eventfd_ >= 0) { - // eventfd(2) dictates that we must write a 64-bit integer - uint64_t numAdded64(numAdded); - bytes_expected = static_cast(sizeof(numAdded64)); - bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64)); - } else { - // pipe semantics, add one message for each numAdded - bytes_expected = numAdded; - do { - size_t messageSize = std::min(numAdded, sizeof(kPipeMessage)); - ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize); - if (rc < 0) { - // TODO: if the pipe is full, write will fail with EAGAIN. - // See task #1044651 for how this could be handled - break; - } - numAdded -= rc; - bytes_written += rc; - } while (numAdded > 0); + size_t bytes_expected = 0; + + do { + if (eventfd_ >= 0) { + // eventfd(2) dictates that we must write a 64-bit integer + uint64_t signal = 1; + bytes_expected = sizeof(signal); + bytes_written = ::write(eventfd_, &signal, bytes_expected); + } else { + uint8_t signal = 1; + bytes_expected = sizeof(signal); + bytes_written = ::write(pipeFds_[1], &signal, bytes_expected); + } + } while (bytes_written == -1 && errno == EINTR); + +#ifdef __ANDROID__ + if (bytes_written > 0) { + eventBytes_ += bytes_written; + maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_); } - if (bytes_written != bytes_expected) { +#endif + + if (bytes_written == ssize_t(bytes_expected)) { + signal_ = true; + } else { +#ifdef __ANDROID__ + LOG(ERROR) << "NotificationQueue Write Error=" << errno + << " bytesInPipe=" << eventBytes_ + << " maxInPipe=" << maxEventBytes_ << " queue=" << size(); +#endif folly::throwSystemError("failed to signal NotificationQueue after " "write", errno); } } - bool tryConsumeEvent() { - uint64_t value = 0; - ssize_t rc = -1; - if (eventfd_ >= 0) { - rc = ::read(eventfd_, &value, sizeof(value)); + void drainSignalsLocked() { + ssize_t bytes_read = 0; + if (eventfd_ > 0) { + uint64_t message; + bytes_read = readNoInt(eventfd_, &message, sizeof(message)); + CHECK(bytes_read != -1 || errno == EAGAIN); } else { - uint8_t value8; - rc = ::read(pipeFds_[0], &value8, sizeof(value8)); - value = value8; + // There should only be one byte in the pipe. To avoid potential leaks we still drain. + uint8_t message[32]; + ssize_t result; + while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) { + bytes_read += result; + } + CHECK(result == -1 && errno == EAGAIN); + LOG_IF(ERROR, bytes_read > 1) + << "[NotificationQueue] Unexpected state while draining pipe: bytes_read=" + << bytes_read << " bytes, expected <= 1"; } - if (rc < 0) { - // EAGAIN should pretty much be the only error we can ever get. - // This means someone else already processed the only available message. - assert(errno == EAGAIN); - return false; + LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0)) + << "[NotificationQueue] Unexpected state while draining signals: signal_=" + << signal_ << " bytes_read=" << bytes_read; + + signal_ = false; + +#ifdef __ANDROID__ + if (bytes_read > 0) { + eventBytes_ -= bytes_read; } - assert(value == 1); - return true; +#endif } - bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) { - checkPid(); - bool signal = false; - { - folly::SpinLockGuard g(spinlock_); - if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) { - return false; - } - // We only need to signal an event if not all consumers are - // awake. - if (numActiveConsumers_ < numConsumers_) { - signal = true; - } - queue_.emplace_back(std::move(message), RequestContext::saveContext()); - } - if (signal) { - signalEvent(); + void ensureSignal() const { + folly::SpinLockGuard g(spinlock_); + ensureSignalLocked(); + } + + void syncSignalAndQueue() { + folly::SpinLockGuard g(spinlock_); + + if (queue_.empty()) { + drainSignalsLocked(); + } else { + ensureSignalLocked(); } - return true; } - bool putMessageImpl( - const MessageT& message, size_t maxSize, bool throws=true) { + template + bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) { checkPid(); bool signal = false; { @@ -531,18 +572,21 @@ class NotificationQueue { if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) { return false; } + // We only need to signal an event if not all consumers are + // awake. if (numActiveConsumers_ < numConsumers_) { signal = true; } - queue_.emplace_back(message, RequestContext::saveContext()); - } - if (signal) { - signalEvent(); + queue_.emplace_back( + std::forward(message), RequestContext::saveContext()); + if (signal) { + ensureSignalLocked(); + } } return true; } - template + template void putMessagesImpl(InputIteratorT first, InputIteratorT last, std::input_iterator_tag) { checkPid(); @@ -559,13 +603,14 @@ class NotificationQueue { if (numActiveConsumers_ < numConsumers_) { signal = true; } - } - if (signal) { - signalEvent(); + if (signal) { + ensureSignalLocked(); + } } } mutable folly::SpinLock spinlock_; + mutable bool signal_{false}; int eventfd_; int pipeFds_[2]; // to fallback to on older/non-linux systems uint32_t advisoryMaxQueueSize_; @@ -576,8 +621,8 @@ class NotificationQueue { bool draining_{false}; }; -template -NotificationQueue::Consumer::~Consumer() { +template +void NotificationQueue::Consumer::destroy() { // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_ // will be non-nullptr. Mark the value that it points to, so that // handlerReady() will know the callback is destroyed, and that it cannot @@ -585,33 +630,34 @@ NotificationQueue::Consumer::~Consumer() { if (destroyedFlagPtr_) { *destroyedFlagPtr_ = true; } + stopConsuming(); + DelayedDestruction::destroy(); } -template -void NotificationQueue::Consumer::handlerReady(uint16_t events) +template +void NotificationQueue::Consumer::handlerReady(uint16_t /*events*/) noexcept { consumeMessages(false); } -template +template void NotificationQueue::Consumer::consumeMessages( - bool isDrain) noexcept { + bool isDrain, size_t* numConsumed) noexcept { + DestructorGuard dg(this); uint32_t numProcessed = 0; - bool firstRun = true; setActive(true); + SCOPE_EXIT { + if (queue_) { + queue_->syncSignalAndQueue(); + } + }; SCOPE_EXIT { setActive(false, /* shouldLock = */ true); }; - while (true) { - // Try to decrement the eventfd. - // - // The eventfd is only used to wake up the consumer - there may or - // may not actually be an event available (another consumer may - // have read it). We don't really care, we only care about - // emptying the queue. - if (!isDrain && firstRun) { - queue_->tryConsumeEvent(); - firstRun = false; + SCOPE_EXIT { + if (numConsumed != nullptr) { + *numConsumed = numProcessed; } - + }; + while (true) { // Now pop the message off of the queue. // // We have to manually acquire and release the spinlock here, rather than @@ -635,8 +681,7 @@ void NotificationQueue::Consumer::consumeMessages( auto& data = queue_->queue_.front(); MessageT msg(std::move(data.first)); - auto old_ctx = - RequestContext::setContext(data.second); + RequestContextScopeGuard rctx(std::move(data.second)); queue_->queue_.pop_front(); // Check to see if the queue is empty now. @@ -656,14 +701,12 @@ void NotificationQueue::Consumer::consumeMessages( CHECK(destroyedFlagPtr_ == nullptr); destroyedFlagPtr_ = &callbackDestroyed; messageAvailable(std::move(msg)); - - RequestContext::setContext(old_ctx); + destroyedFlagPtr_ = nullptr; // If the callback was destroyed before it returned, we are done if (callbackDestroyed) { return; } - destroyedFlagPtr_ = nullptr; // If the callback is no longer installed, we are done. if (queue_ == nullptr) { @@ -674,7 +717,6 @@ void NotificationQueue::Consumer::consumeMessages( ++numProcessed; if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { - queue_->signalEvent(1); return; } @@ -687,7 +729,7 @@ void NotificationQueue::Consumer::consumeMessages( if (wasEmpty) { return; } - } catch (const std::exception& ex) { + } catch (const std::exception&) { // This catch block is really just to handle the case where the MessageT // constructor throws. The messageAvailable() callback itself is // declared as noexcept and should never throw. @@ -701,12 +743,6 @@ void NotificationQueue::Consumer::consumeMessages( if (locked) { // Unlock the spinlock. queue_->spinlock_.unlock(); - - // Push a notification back on the eventfd since we didn't actually - // read the message off of the queue. - if (!isDrain) { - queue_->signalEvent(1); - } } return; @@ -714,11 +750,11 @@ void NotificationQueue::Consumer::consumeMessages( } } -template +template void NotificationQueue::Consumer::init( EventBase* eventBase, NotificationQueue* queue) { - assert(eventBase->isInEventBaseThread()); + eventBase->dcheckIsInEventBaseThread(); assert(queue_ == nullptr); assert(!isHandlerRegistered()); queue->checkPid(); @@ -731,7 +767,7 @@ void NotificationQueue::Consumer::init( folly::SpinLockGuard g(queue_->spinlock_); queue_->numConsumers_++; } - queue_->signalEvent(); + queue_->ensureSignal(); if (queue_->eventfd_ >= 0) { initHandler(eventBase, queue_->eventfd_); @@ -740,7 +776,7 @@ void NotificationQueue::Consumer::init( } } -template +template void NotificationQueue::Consumer::stopConsuming() { if (queue_ == nullptr) { assert(!isHandlerRegistered()); @@ -759,8 +795,10 @@ void NotificationQueue::Consumer::stopConsuming() { queue_ = nullptr; } -template -bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { +template +bool NotificationQueue::Consumer::consumeUntilDrained( + size_t* numConsumed) noexcept { + DestructorGuard dg(this); { folly::SpinLockGuard g(queue_->spinlock_); if (queue_->draining_) { @@ -768,7 +806,7 @@ bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { } queue_->draining_ = true; } - consumeMessages(true); + consumeMessages(true, numConsumed); { folly::SpinLockGuard g(queue_->spinlock_); queue_->draining_ = false; @@ -776,4 +814,49 @@ bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { return true; } -} // folly +/** + * Creates a NotificationQueue::Consumer wrapping a function object + * Modeled after AsyncTimeout::make + * + */ + +namespace detail { + +template +struct notification_queue_consumer_wrapper + : public NotificationQueue::Consumer { + + template + explicit notification_queue_consumer_wrapper(UCallback&& callback) + : callback_(std::forward(callback)) {} + + // we are being stricter here and requiring noexcept for callback + void messageAvailable(MessageT&& message) noexcept override { + static_assert( + noexcept(std::declval()(std::forward(message))), + "callback must be declared noexcept, e.g.: `[]() noexcept {}`" + ); + + callback_(std::forward(message)); + } + + private: + TCallback callback_; +}; + +} // namespace detail + +template +template +std::unique_ptr::Consumer, + DelayedDestruction::Destructor> +NotificationQueue::Consumer::make(TCallback&& callback) { + return std::unique_ptr::Consumer, + DelayedDestruction::Destructor>( + new detail::notification_queue_consumer_wrapper< + MessageT, + typename std::decay::type>( + std::forward(callback))); +} + +} // namespace folly