X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FNotificationQueue.h;h=0f26d88d51a13546f61d21bbce11af247370344f;hb=35fcff936a0ba58986269fb05689843f99e89eb5;hp=95c85160d6403e0353f9d81b3acb9ee15ea6fa95;hpb=275ca94d04e44f28cfa411668eb1c1dd8db90b80;p=folly.git diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index 95c85160..0f26d88d 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,28 @@ #pragma once -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include #include #include +#include #include #include #include #include +#include +#include #include -#include #if __linux__ && !__ANDROID__ #define FOLLY_HAVE_EVENTFD @@ -60,7 +70,7 @@ class NotificationQueue { /** * A callback interface for consuming messages from the queue as they arrive. */ - class Consumer : private EventHandler { + class Consumer : public DelayedDestruction, private EventHandler { public: enum : uint16_t { kDefaultMaxReadAtOnce = 10 }; @@ -69,7 +79,10 @@ class NotificationQueue { destroyedFlagPtr_(nullptr), maxReadAtOnce_(kDefaultMaxReadAtOnce) {} - virtual ~Consumer(); + // create a consumer in-place, without the need to build new class + template + static std::unique_ptr make( + TCallback&& callback); /** * messageAvailable() will be invoked whenever a new @@ -120,7 +133,7 @@ class NotificationQueue { * @returns true if the queue was drained, false otherwise. In practice, * this will only fail if someone else is already draining the queue. */ - bool consumeUntilDrained() noexcept; + bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept; /** * Get the NotificationQueue that this consumer is currently consuming @@ -152,7 +165,13 @@ class NotificationQueue { return base_; } - virtual void handlerReady(uint16_t events) noexcept; + void handlerReady(uint16_t events) noexcept override; + + protected: + + void destroy() override; + + virtual ~Consumer() {} private: /** @@ -165,7 +184,7 @@ class NotificationQueue { * * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation. */ - void consumeMessages(bool isDrain) noexcept; + void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept; void setActive(bool active, bool shouldLock = false) { if (!queue_) { @@ -194,6 +213,24 @@ class NotificationQueue { bool active_{false}; }; + class SimpleConsumer { + public: + explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) { + ++queue_.numConsumers_; + } + + ~SimpleConsumer() { + --queue_.numConsumers_; + } + + int getFd() const { + return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0]; + } + + private: + NotificationQueue& queue_; + }; + enum class FdType { PIPE, #ifdef FOLLY_HAVE_EVENTFD @@ -222,17 +259,17 @@ class NotificationQueue { #else FdType fdType = FdType::PIPE) #endif - : eventfd_(-1), - pipeFds_{-1, -1}, - advisoryMaxQueueSize_(maxSize), - pid_(getpid()), - queue_() { + : eventfd_(-1), + pipeFds_{-1, -1}, + advisoryMaxQueueSize_(maxSize), + pid_(pid_t(getpid())), + queue_() { - RequestContext::getStaticContext(); + RequestContext::saveContext(); #ifdef FOLLY_HAVE_EVENTFD if (fdType == FdType::EVENTFD) { - eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE); + eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventfd_ == -1) { if (errno == ENOSYS || errno == EINVAL) { // eventfd not availalble @@ -375,34 +412,26 @@ class NotificationQueue { * unmodified. */ bool tryConsume(MessageT& result) { + SCOPE_EXIT { syncSignalAndQueue(); }; + checkPid(); - try { + folly::SpinLockGuard g(spinlock_); - folly::SpinLockGuard g(spinlock_); + if (UNLIKELY(queue_.empty())) { + return false; + } - if (UNLIKELY(queue_.empty())) { - return false; - } + auto data = std::move(queue_.front()); + result = data.first; + RequestContext::setContext(data.second); - auto data = std::move(queue_.front()); - result = data.first; - RequestContext::setContext(data.second); - - queue_.pop_front(); - } catch (...) { - // Handle an exception if the assignment operator happens to throw. - // We consumed an event but weren't able to pop the message off the - // queue. Signal the event again since the message is still in the - // queue. - signalEvent(1); - throw; - } + queue_.pop_front(); return true; } - int size() { + size_t size() const { folly::SpinLockGuard g(spinlock_); return queue_.size(); } @@ -420,9 +449,7 @@ class NotificationQueue { * check ensures that we catch the problem in the misbehaving child process * code, and crash before signalling the parent process. */ - void checkPid() const { - CHECK_EQ(pid_, getpid()); - } + void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); } private: // Forbidden copy constructor and assignment operator @@ -448,57 +475,98 @@ class NotificationQueue { return draining_; } - inline void signalEvent(size_t numAdded = 1) const { - static const uint8_t kPipeMessage[] = { - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 - }; +#ifdef __ANDROID__ + // TODO 10860938 Remove after figuring out crash + mutable std::atomic eventBytes_{0}; + mutable std::atomic maxEventBytes_{0}; +#endif + + void ensureSignalLocked() const { + // semantics: empty fd == empty queue <=> !signal_ + if (signal_) { + return; + } ssize_t bytes_written = 0; ssize_t bytes_expected = 0; - if (eventfd_ >= 0) { - // eventfd(2) dictates that we must write a 64-bit integer - uint64_t numAdded64(numAdded); - bytes_expected = static_cast(sizeof(numAdded64)); - bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64)); - } else { - // pipe semantics, add one message for each numAdded - bytes_expected = numAdded; - do { - size_t messageSize = std::min(numAdded, sizeof(kPipeMessage)); - ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize); - if (rc < 0) { - // TODO: if the pipe is full, write will fail with EAGAIN. - // See task #1044651 for how this could be handled - break; - } - numAdded -= rc; - bytes_written += rc; - } while (numAdded > 0); + + do { + if (eventfd_ >= 0) { + // eventfd(2) dictates that we must write a 64-bit integer + uint64_t signal = 1; + bytes_expected = static_cast(sizeof(signal)); + bytes_written = ::write(eventfd_, &signal, bytes_expected); + } else { + uint8_t signal = 1; + bytes_expected = static_cast(sizeof(signal)); + bytes_written = ::write(pipeFds_[1], &signal, bytes_expected); + } + } while (bytes_written == -1 && errno == EINTR); + +#ifdef __ANDROID__ + if (bytes_written > 0) { + eventBytes_ += bytes_written; + maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_); } - if (bytes_written != bytes_expected) { +#endif + + if (bytes_written == bytes_expected) { + signal_ = true; + } else { +#ifdef __ANDROID__ + LOG(ERROR) << "NotificationQueue Write Error=" << errno + << " bytesInPipe=" << eventBytes_ + << " maxInPipe=" << maxEventBytes_ << " queue=" << size(); +#endif folly::throwSystemError("failed to signal NotificationQueue after " "write", errno); } } - bool tryConsumeEvent() { - uint64_t value = 0; - ssize_t rc = -1; - if (eventfd_ >= 0) { - rc = ::read(eventfd_, &value, sizeof(value)); + void drainSignalsLocked() { + ssize_t bytes_read = 0; + if (eventfd_ > 0) { + uint64_t message; + bytes_read = readNoInt(eventfd_, &message, sizeof(message)); + CHECK(bytes_read != -1 || errno == EAGAIN); } else { - uint8_t value8; - rc = ::read(pipeFds_[0], &value8, sizeof(value8)); - value = value8; + // There should only be one byte in the pipe. To avoid potential leaks we still drain. + uint8_t message[32]; + ssize_t result; + while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) { + bytes_read += result; + } + CHECK(result == -1 && errno == EAGAIN); + LOG_IF(ERROR, bytes_read > 1) + << "[NotificationQueue] Unexpected state while draining pipe: bytes_read=" + << bytes_read << " bytes, expected <= 1"; } - if (rc < 0) { - // EAGAIN should pretty much be the only error we can ever get. - // This means someone else already processed the only available message. - assert(errno == EAGAIN); - return false; + LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0)) + << "[NotificationQueue] Unexpected state while draining signals: signal_=" + << signal_ << " bytes_read=" << bytes_read; + + signal_ = false; + +#ifdef __ANDROID__ + if (bytes_read > 0) { + eventBytes_ -= bytes_read; + } +#endif + } + + void ensureSignal() const { + folly::SpinLockGuard g(spinlock_); + ensureSignalLocked(); + } + + void syncSignalAndQueue() { + folly::SpinLockGuard g(spinlock_); + + if (queue_.empty()) { + drainSignalsLocked(); + } else { + ensureSignalLocked(); } - assert(value == 1); - return true; } bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) { @@ -514,12 +582,10 @@ class NotificationQueue { if (numActiveConsumers_ < numConsumers_) { signal = true; } - queue_.push_back( - std::make_pair(std::move(message), - RequestContext::saveContext())); - } - if (signal) { - signalEvent(); + queue_.emplace_back(std::move(message), RequestContext::saveContext()); + if (signal) { + ensureSignalLocked(); + } } return true; } @@ -536,10 +602,10 @@ class NotificationQueue { if (numActiveConsumers_ < numConsumers_) { signal = true; } - queue_.push_back(std::make_pair(message, RequestContext::saveContext())); - } - if (signal) { - signalEvent(); + queue_.emplace_back(message, RequestContext::saveContext()); + if (signal) { + ensureSignalLocked(); + } } return true; } @@ -554,20 +620,21 @@ class NotificationQueue { folly::SpinLockGuard g(spinlock_); checkDraining(); while (first != last) { - queue_.push_back(std::make_pair(*first, RequestContext::saveContext())); + queue_.emplace_back(*first, RequestContext::saveContext()); ++first; ++numAdded; } if (numActiveConsumers_ < numConsumers_) { signal = true; } - } - if (signal) { - signalEvent(); + if (signal) { + ensureSignalLocked(); + } } } mutable folly::SpinLock spinlock_; + mutable bool signal_{false}; int eventfd_; int pipeFds_[2]; // to fallback to on older/non-linux systems uint32_t advisoryMaxQueueSize_; @@ -579,7 +646,7 @@ class NotificationQueue { }; template -NotificationQueue::Consumer::~Consumer() { +void NotificationQueue::Consumer::destroy() { // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_ // will be non-nullptr. Mark the value that it points to, so that // handlerReady() will know the callback is destroyed, and that it cannot @@ -587,33 +654,34 @@ NotificationQueue::Consumer::~Consumer() { if (destroyedFlagPtr_) { *destroyedFlagPtr_ = true; } + stopConsuming(); + DelayedDestruction::destroy(); } template -void NotificationQueue::Consumer::handlerReady(uint16_t events) +void NotificationQueue::Consumer::handlerReady(uint16_t /*events*/) noexcept { consumeMessages(false); } template void NotificationQueue::Consumer::consumeMessages( - bool isDrain) noexcept { + bool isDrain, size_t* numConsumed) noexcept { + DestructorGuard dg(this); uint32_t numProcessed = 0; - bool firstRun = true; setActive(true); + SCOPE_EXIT { + if (queue_) { + queue_->syncSignalAndQueue(); + } + }; SCOPE_EXIT { setActive(false, /* shouldLock = */ true); }; - while (true) { - // Try to decrement the eventfd. - // - // The eventfd is only used to wake up the consumer - there may or - // may not actually be an event available (another consumer may - // have read it). We don't really care, we only care about - // emptying the queue. - if (!isDrain && firstRun) { - queue_->tryConsumeEvent(); - firstRun = false; + SCOPE_EXIT { + if (numConsumed != nullptr) { + *numConsumed = numProcessed; } - + }; + while (true) { // Now pop the message off of the queue. // // We have to manually acquire and release the spinlock here, rather than @@ -637,8 +705,7 @@ void NotificationQueue::Consumer::consumeMessages( auto& data = queue_->queue_.front(); MessageT msg(std::move(data.first)); - auto old_ctx = - RequestContext::setContext(data.second); + RequestContextScopeGuard rctx(std::move(data.second)); queue_->queue_.pop_front(); // Check to see if the queue is empty now. @@ -658,14 +725,12 @@ void NotificationQueue::Consumer::consumeMessages( CHECK(destroyedFlagPtr_ == nullptr); destroyedFlagPtr_ = &callbackDestroyed; messageAvailable(std::move(msg)); - - RequestContext::setContext(old_ctx); + destroyedFlagPtr_ = nullptr; // If the callback was destroyed before it returned, we are done if (callbackDestroyed) { return; } - destroyedFlagPtr_ = nullptr; // If the callback is no longer installed, we are done. if (queue_ == nullptr) { @@ -676,7 +741,6 @@ void NotificationQueue::Consumer::consumeMessages( ++numProcessed; if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { - queue_->signalEvent(1); return; } @@ -703,12 +767,6 @@ void NotificationQueue::Consumer::consumeMessages( if (locked) { // Unlock the spinlock. queue_->spinlock_.unlock(); - - // Push a notification back on the eventfd since we didn't actually - // read the message off of the queue. - if (!isDrain) { - queue_->signalEvent(1); - } } return; @@ -733,7 +791,7 @@ void NotificationQueue::Consumer::init( folly::SpinLockGuard g(queue_->spinlock_); queue_->numConsumers_++; } - queue_->signalEvent(); + queue_->ensureSignal(); if (queue_->eventfd_ >= 0) { initHandler(eventBase, queue_->eventfd_); @@ -762,7 +820,9 @@ void NotificationQueue::Consumer::stopConsuming() { } template -bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { +bool NotificationQueue::Consumer::consumeUntilDrained( + size_t* numConsumed) noexcept { + DestructorGuard dg(this); { folly::SpinLockGuard g(queue_->spinlock_); if (queue_->draining_) { @@ -770,7 +830,7 @@ bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { } queue_->draining_ = true; } - consumeMessages(true); + consumeMessages(true, numConsumed); { folly::SpinLockGuard g(queue_->spinlock_); queue_->draining_ = false; @@ -778,4 +838,49 @@ bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { return true; } +/** + * Creates a NotificationQueue::Consumer wrapping a function object + * Modeled after AsyncTimeout::make + * + */ + +namespace detail { + +template +struct notification_queue_consumer_wrapper + : public NotificationQueue::Consumer { + + template + explicit notification_queue_consumer_wrapper(UCallback&& callback) + : callback_(std::forward(callback)) {} + + // we are being stricter here and requiring noexcept for callback + void messageAvailable(MessageT&& message) override { + static_assert( + noexcept(std::declval()(std::forward(message))), + "callback must be declared noexcept, e.g.: `[]() noexcept {}`" + ); + + callback_(std::forward(message)); + } + + private: + TCallback callback_; +}; + +} // namespace detail + +template +template +std::unique_ptr::Consumer, + DelayedDestruction::Destructor> +NotificationQueue::Consumer::make(TCallback&& callback) { + return std::unique_ptr::Consumer, + DelayedDestruction::Destructor>( + new detail::notification_queue_consumer_wrapper< + MessageT, + typename std::decay::type>( + std::forward(callback))); +} + } // folly