X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FNotificationQueue.h;h=3b7d56d907c33c25ef8ae1e2d939d6c33a27f705;hp=b0a2bd732aa4e73812421a07e02d20661aaa39c7;hb=1672380910a8c21cd36095661eb1360f43c93332;hpb=b5338e82505238ec3f4d4639098127e326344870 diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index b0a2bd73..3b7d56d9 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,17 @@ #pragma once -#include -#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include #include #include #include @@ -26,9 +34,11 @@ #include #include #include +#include +#include +#include #include -#include #if __linux__ && !__ANDROID__ #define FOLLY_HAVE_EVENTFD @@ -70,11 +80,16 @@ class NotificationQueue { destroyedFlagPtr_(nullptr), maxReadAtOnce_(kDefaultMaxReadAtOnce) {} + // create a consumer in-place, without the need to build new class + template + static std::unique_ptr make( + TCallback&& callback); + /** * messageAvailable() will be invoked whenever a new * message is available from the pipe. */ - virtual void messageAvailable(MessageT&& message) = 0; + virtual void messageAvailable(MessageT&& message) noexcept = 0; /** * Begin consuming messages from the specified queue. @@ -157,7 +172,7 @@ class NotificationQueue { void destroy() override; - virtual ~Consumer() {} + ~Consumer() override {} private: /** @@ -199,6 +214,24 @@ class NotificationQueue { bool active_{false}; }; + class SimpleConsumer { + public: + explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) { + ++queue_.numConsumers_; + } + + ~SimpleConsumer() { + --queue_.numConsumers_; + } + + int getFd() const { + return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0]; + } + + private: + NotificationQueue& queue_; + }; + enum class FdType { PIPE, #ifdef FOLLY_HAVE_EVENTFD @@ -227,17 +260,17 @@ class NotificationQueue { #else FdType fdType = FdType::PIPE) #endif - : eventfd_(-1), - pipeFds_{-1, -1}, - advisoryMaxQueueSize_(maxSize), - pid_(getpid()), - queue_() { + : eventfd_(-1), + pipeFds_{-1, -1}, + advisoryMaxQueueSize_(maxSize), + pid_(pid_t(getpid())), + queue_() { RequestContext::saveContext(); #ifdef FOLLY_HAVE_EVENTFD if (fdType == FdType::EVENTFD) { - eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE); + eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventfd_ == -1) { if (errno == ENOSYS || errno == EINVAL) { // eventfd not availalble @@ -380,34 +413,26 @@ class NotificationQueue { * unmodified. */ bool tryConsume(MessageT& result) { + SCOPE_EXIT { syncSignalAndQueue(); }; + checkPid(); - try { + folly::SpinLockGuard g(spinlock_); - folly::SpinLockGuard g(spinlock_); + if (UNLIKELY(queue_.empty())) { + return false; + } - if (UNLIKELY(queue_.empty())) { - return false; - } + auto& data = queue_.front(); + result = std::move(data.first); + RequestContext::setContext(std::move(data.second)); - auto data = std::move(queue_.front()); - result = data.first; - RequestContext::setContext(data.second); - - queue_.pop_front(); - } catch (...) { - // Handle an exception if the assignment operator happens to throw. - // We consumed an event but weren't able to pop the message off the - // queue. Signal the event again since the message is still in the - // queue. - signalEvent(1); - throw; - } + queue_.pop_front(); return true; } - int size() { + size_t size() const { folly::SpinLockGuard g(spinlock_); return queue_.size(); } @@ -425,9 +450,7 @@ class NotificationQueue { * check ensures that we catch the problem in the misbehaving child process * code, and crash before signalling the parent process. */ - void checkPid() const { - CHECK_EQ(pid_, getpid()); - } + void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); } private: // Forbidden copy constructor and assignment operator @@ -435,7 +458,7 @@ class NotificationQueue { NotificationQueue& operator=(NotificationQueue const &) = delete; inline bool checkQueueSize(size_t maxSize, bool throws=true) const { - DCHECK(0 == spinlock_.trylock()); + DCHECK(0 == spinlock_.try_lock()); if (maxSize > 0 && queue_.size() >= maxSize) { if (throws) { throw std::overflow_error("unable to add message to NotificationQueue: " @@ -453,57 +476,98 @@ class NotificationQueue { return draining_; } - inline void signalEvent(size_t numAdded = 1) const { - static const uint8_t kPipeMessage[] = { - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 - }; +#ifdef __ANDROID__ + // TODO 10860938 Remove after figuring out crash + mutable std::atomic eventBytes_{0}; + mutable std::atomic maxEventBytes_{0}; +#endif + + void ensureSignalLocked() const { + // semantics: empty fd == empty queue <=> !signal_ + if (signal_) { + return; + } ssize_t bytes_written = 0; - ssize_t bytes_expected = 0; - if (eventfd_ >= 0) { - // eventfd(2) dictates that we must write a 64-bit integer - uint64_t numAdded64(numAdded); - bytes_expected = static_cast(sizeof(numAdded64)); - bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64)); - } else { - // pipe semantics, add one message for each numAdded - bytes_expected = numAdded; - do { - size_t messageSize = std::min(numAdded, sizeof(kPipeMessage)); - ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize); - if (rc < 0) { - // TODO: if the pipe is full, write will fail with EAGAIN. - // See task #1044651 for how this could be handled - break; - } - numAdded -= rc; - bytes_written += rc; - } while (numAdded > 0); + size_t bytes_expected = 0; + + do { + if (eventfd_ >= 0) { + // eventfd(2) dictates that we must write a 64-bit integer + uint64_t signal = 1; + bytes_expected = sizeof(signal); + bytes_written = ::write(eventfd_, &signal, bytes_expected); + } else { + uint8_t signal = 1; + bytes_expected = sizeof(signal); + bytes_written = ::write(pipeFds_[1], &signal, bytes_expected); + } + } while (bytes_written == -1 && errno == EINTR); + +#ifdef __ANDROID__ + if (bytes_written > 0) { + eventBytes_ += bytes_written; + maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_); } - if (bytes_written != bytes_expected) { +#endif + + if (bytes_written == ssize_t(bytes_expected)) { + signal_ = true; + } else { +#ifdef __ANDROID__ + LOG(ERROR) << "NotificationQueue Write Error=" << errno + << " bytesInPipe=" << eventBytes_ + << " maxInPipe=" << maxEventBytes_ << " queue=" << size(); +#endif folly::throwSystemError("failed to signal NotificationQueue after " "write", errno); } } - bool tryConsumeEvent() { - uint64_t value = 0; - ssize_t rc = -1; - if (eventfd_ >= 0) { - rc = ::read(eventfd_, &value, sizeof(value)); + void drainSignalsLocked() { + ssize_t bytes_read = 0; + if (eventfd_ > 0) { + uint64_t message; + bytes_read = readNoInt(eventfd_, &message, sizeof(message)); + CHECK(bytes_read != -1 || errno == EAGAIN); } else { - uint8_t value8; - rc = ::read(pipeFds_[0], &value8, sizeof(value8)); - value = value8; + // There should only be one byte in the pipe. To avoid potential leaks we still drain. + uint8_t message[32]; + ssize_t result; + while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) { + bytes_read += result; + } + CHECK(result == -1 && errno == EAGAIN); + LOG_IF(ERROR, bytes_read > 1) + << "[NotificationQueue] Unexpected state while draining pipe: bytes_read=" + << bytes_read << " bytes, expected <= 1"; } - if (rc < 0) { - // EAGAIN should pretty much be the only error we can ever get. - // This means someone else already processed the only available message. - assert(errno == EAGAIN); - return false; + LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0)) + << "[NotificationQueue] Unexpected state while draining signals: signal_=" + << signal_ << " bytes_read=" << bytes_read; + + signal_ = false; + +#ifdef __ANDROID__ + if (bytes_read > 0) { + eventBytes_ -= bytes_read; + } +#endif + } + + void ensureSignal() const { + folly::SpinLockGuard g(spinlock_); + ensureSignalLocked(); + } + + void syncSignalAndQueue() { + folly::SpinLockGuard g(spinlock_); + + if (queue_.empty()) { + drainSignalsLocked(); + } else { + ensureSignalLocked(); } - assert(value == 1); - return true; } bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) { @@ -520,9 +584,9 @@ class NotificationQueue { signal = true; } queue_.emplace_back(std::move(message), RequestContext::saveContext()); - } - if (signal) { - signalEvent(); + if (signal) { + ensureSignalLocked(); + } } return true; } @@ -540,9 +604,9 @@ class NotificationQueue { signal = true; } queue_.emplace_back(message, RequestContext::saveContext()); - } - if (signal) { - signalEvent(); + if (signal) { + ensureSignalLocked(); + } } return true; } @@ -564,13 +628,14 @@ class NotificationQueue { if (numActiveConsumers_ < numConsumers_) { signal = true; } - } - if (signal) { - signalEvent(); + if (signal) { + ensureSignalLocked(); + } } } mutable folly::SpinLock spinlock_; + mutable bool signal_{false}; int eventfd_; int pipeFds_[2]; // to fallback to on older/non-linux systems uint32_t advisoryMaxQueueSize_; @@ -605,8 +670,12 @@ void NotificationQueue::Consumer::consumeMessages( bool isDrain, size_t* numConsumed) noexcept { DestructorGuard dg(this); uint32_t numProcessed = 0; - bool firstRun = true; setActive(true); + SCOPE_EXIT { + if (queue_) { + queue_->syncSignalAndQueue(); + } + }; SCOPE_EXIT { setActive(false, /* shouldLock = */ true); }; SCOPE_EXIT { if (numConsumed != nullptr) { @@ -614,17 +683,6 @@ void NotificationQueue::Consumer::consumeMessages( } }; while (true) { - // Try to decrement the eventfd. - // - // The eventfd is only used to wake up the consumer - there may or - // may not actually be an event available (another consumer may - // have read it). We don't really care, we only care about - // emptying the queue. - if (!isDrain && firstRun) { - queue_->tryConsumeEvent(); - firstRun = false; - } - // Now pop the message off of the queue. // // We have to manually acquire and release the spinlock here, rather than @@ -648,8 +706,7 @@ void NotificationQueue::Consumer::consumeMessages( auto& data = queue_->queue_.front(); MessageT msg(std::move(data.first)); - auto old_ctx = - RequestContext::setContext(data.second); + RequestContextScopeGuard rctx(std::move(data.second)); queue_->queue_.pop_front(); // Check to see if the queue is empty now. @@ -671,8 +728,6 @@ void NotificationQueue::Consumer::consumeMessages( messageAvailable(std::move(msg)); destroyedFlagPtr_ = nullptr; - RequestContext::setContext(old_ctx); - // If the callback was destroyed before it returned, we are done if (callbackDestroyed) { return; @@ -687,7 +742,6 @@ void NotificationQueue::Consumer::consumeMessages( ++numProcessed; if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { - queue_->signalEvent(1); return; } @@ -700,7 +754,7 @@ void NotificationQueue::Consumer::consumeMessages( if (wasEmpty) { return; } - } catch (const std::exception& ex) { + } catch (const std::exception&) { // This catch block is really just to handle the case where the MessageT // constructor throws. The messageAvailable() callback itself is // declared as noexcept and should never throw. @@ -714,12 +768,6 @@ void NotificationQueue::Consumer::consumeMessages( if (locked) { // Unlock the spinlock. queue_->spinlock_.unlock(); - - // Push a notification back on the eventfd since we didn't actually - // read the message off of the queue. - if (!isDrain) { - queue_->signalEvent(1); - } } return; @@ -744,7 +792,7 @@ void NotificationQueue::Consumer::init( folly::SpinLockGuard g(queue_->spinlock_); queue_->numConsumers_++; } - queue_->signalEvent(); + queue_->ensureSignal(); if (queue_->eventfd_ >= 0) { initHandler(eventBase, queue_->eventfd_); @@ -791,4 +839,49 @@ bool NotificationQueue::Consumer::consumeUntilDrained( return true; } +/** + * Creates a NotificationQueue::Consumer wrapping a function object + * Modeled after AsyncTimeout::make + * + */ + +namespace detail { + +template +struct notification_queue_consumer_wrapper + : public NotificationQueue::Consumer { + + template + explicit notification_queue_consumer_wrapper(UCallback&& callback) + : callback_(std::forward(callback)) {} + + // we are being stricter here and requiring noexcept for callback + void messageAvailable(MessageT&& message) noexcept override { + static_assert( + noexcept(std::declval()(std::forward(message))), + "callback must be declared noexcept, e.g.: `[]() noexcept {}`" + ); + + callback_(std::forward(message)); + } + + private: + TCallback callback_; +}; + +} // namespace detail + +template +template +std::unique_ptr::Consumer, + DelayedDestruction::Destructor> +NotificationQueue::Consumer::make(TCallback&& callback) { + return std::unique_ptr::Consumer, + DelayedDestruction::Destructor>( + new detail::notification_queue_consumer_wrapper< + MessageT, + typename std::decay::type>( + std::forward(callback))); +} + } // folly