/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <unistd.h>
#include <folly/io/async/EventBase.h>
-#include <folly/io/async/EventFDWrapper.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/Request.h>
#include <folly/Likely.h>
-#include <folly/SmallLocks.h>
#include <folly/ScopeGuard.h>
+#include <folly/SpinLock.h>
#include <glog/logging.h>
#include <deque>
+#if __linux__ && !__ANDROID__
+#define FOLLY_HAVE_EVENTFD
+#include <folly/io/async/EventFDWrapper.h>
+#endif
+
namespace folly {
/**
*/
void stopConsuming();
+ /**
+ * Consume messages off the queue until it is empty. No messages may be
+ * added to the queue while it is draining, so that the process is bounded.
+ * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
+ * and tryPutMessageNoThrow will return false.
+ *
+ * @returns true if the queue was drained, false otherwise. In practice,
+ * this will only fail if someone else is already draining the queue.
+ */
+ bool consumeUntilDrained() noexcept;
+
/**
* Get the NotificationQueue that this consumer is currently consuming
* messages from. Returns nullptr if the consumer is not currently
virtual void handlerReady(uint16_t events) noexcept;
private:
+ /**
+ * Consume messages off the the queue until
+ * - the queue is empty (1), or
+ * - until the consumer is destroyed, or
+ * - until the consumer is uninstalled, or
+ * - an exception is thrown in the course of dequeueing, or
+ * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
+ *
+ * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
+ */
+ void consumeMessages(bool isDrain) noexcept;
void setActive(bool active, bool shouldLock = false) {
if (!queue_) {
};
enum class FdType {
+ PIPE,
+#ifdef FOLLY_HAVE_EVENTFD
EVENTFD,
- PIPE
+#endif
};
/**
* mostly for testing purposes.
*/
explicit NotificationQueue(uint32_t maxSize = 0,
- FdType fdType = FdType::EVENTFD)
+#ifdef FOLLY_HAVE_EVENTFD
+ FdType fdType = FdType::EVENTFD)
+#else
+ FdType fdType = FdType::PIPE)
+#endif
: eventfd_(-1),
pipeFds_{-1, -1},
advisoryMaxQueueSize_(maxSize),
pid_(getpid()),
queue_() {
- spinlock_.init();
-
RequestContext::getStaticContext();
+#ifdef FOLLY_HAVE_EVENTFD
if (fdType == FdType::EVENTFD) {
eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
if (eventfd_ == -1) {
}
}
}
+#endif
if (fdType == FdType::PIPE) {
if (pipe(pipeFds_)) {
folly::throwSystemError("Failed to create pipe for NotificationQueue",
* If the queue is full, a std::overflow_error will be thrown. The
* setMaxQueueSize() function controls the maximum queue size.
*
+ * If the queue is currently draining, an std::runtime_error will be thrown.
+ *
* This method may contend briefly on a spinlock if many threads are
* concurrently accessing the queue, but for all intents and purposes it will
* immediately place the message on the queue and return.
* No-throw versions of the above. Instead returns true on success, false on
* failure.
*
- * Only std::overflow_error is prevented from being thrown (since this is the
- * common exception case), user code must still catch std::bad_alloc errors.
+ * Only std::overflow_error (the common exception case) and std::runtime_error
+ * (which indicates that the queue is being drained) are prevented from being
+ * thrown. User code must still catch std::bad_alloc errors.
*/
bool tryPutMessageNoThrow(MessageT&& message) {
return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
* and always puts the message on the queue, even if the maximum queue size
* would be exceeded.
*
- * putMessage() may throw std::bad_alloc if memory allocation fails, and may
- * throw any other exception thrown by the MessageT move/copy constructor.
+ * putMessage() may throw
+ * - std::bad_alloc if memory allocation fails, and may
+ * - std::runtime_error if the queue is currently draining
+ * - any other exception thrown by the MessageT move/copy constructor.
*/
void putMessage(MessageT&& message) {
putMessageImpl(std::move(message), 0);
try {
- folly::MSLGuard g(spinlock_);
+ folly::SpinLockGuard g(spinlock_);
if (UNLIKELY(queue_.empty())) {
return false;
}
int size() {
- folly::MSLGuard g(spinlock_);
+ folly::SpinLockGuard g(spinlock_);
return queue_.size();
}
NotificationQueue& operator=(NotificationQueue const &) = delete;
inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
- DCHECK(0 == spinlock_.try_lock());
+ DCHECK(0 == spinlock_.trylock());
if (maxSize > 0 && queue_.size() >= maxSize) {
if (throws) {
throw std::overflow_error("unable to add message to NotificationQueue: "
return true;
}
+ inline bool checkDraining(bool throws=true) {
+ if (UNLIKELY(draining_ && throws)) {
+ throw std::runtime_error("queue is draining, cannot add message");
+ }
+ return draining_;
+ }
+
inline void signalEvent(size_t numAdded = 1) const {
static const uint8_t kPipeMessage[] = {
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
checkPid();
bool signal = false;
{
- folly::MSLGuard g(spinlock_);
- if (!checkQueueSize(maxSize, throws)) {
+ folly::SpinLockGuard g(spinlock_);
+ if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
// We only need to signal an event if not all consumers are
checkPid();
bool signal = false;
{
- folly::MSLGuard g(spinlock_);
- if (!checkQueueSize(maxSize, throws)) {
+ folly::SpinLockGuard g(spinlock_);
+ if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
if (numActiveConsumers_ < numConsumers_) {
bool signal = false;
size_t numAdded = 0;
{
- folly::MSLGuard g(spinlock_);
+ folly::SpinLockGuard g(spinlock_);
+ checkDraining();
while (first != last) {
queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
++first;
}
}
- mutable folly::MicroSpinLock spinlock_;
+ mutable folly::SpinLock spinlock_;
int eventfd_;
int pipeFds_[2]; // to fallback to on older/non-linux systems
uint32_t advisoryMaxQueueSize_;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
int numConsumers_{0};
std::atomic<int> numActiveConsumers_{0};
+ bool draining_{false};
};
template<typename MessageT>
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
noexcept {
+ consumeMessages(false);
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::consumeMessages(
+ bool isDrain) noexcept {
uint32_t numProcessed = 0;
bool firstRun = true;
setActive(true);
// may not actually be an event available (another consumer may
// have read it). We don't really care, we only care about
// emptying the queue.
- if (firstRun) {
+ if (!isDrain && firstRun) {
queue_->tryConsumeEvent();
firstRun = false;
}
try {
if (UNLIKELY(queue_->queue_.empty())) {
// If there is no message, we've reached the end of the queue, return.
+ setActive(false);
queue_->spinlock_.unlock();
return;
}
// If we have hit maxReadAtOnce_, we are done.
++numProcessed;
- if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+ if (!isDrain && maxReadAtOnce_ > 0 &&
+ numProcessed >= maxReadAtOnce_) {
queue_->signalEvent(1);
return;
}
// Push a notification back on the eventfd since we didn't actually
// read the message off of the queue.
- queue_->signalEvent(1);
+ if (!isDrain) {
+ queue_->signalEvent(1);
+ }
}
return;
queue_ = queue;
{
- folly::MSLGuard g(queue_->spinlock_);
+ folly::SpinLockGuard g(queue_->spinlock_);
queue_->numConsumers_++;
}
queue_->signalEvent();
}
{
- folly::MSLGuard g(queue_->spinlock_);
+ folly::SpinLockGuard g(queue_->spinlock_);
queue_->numConsumers_--;
setActive(false);
}
queue_ = nullptr;
}
+template<typename MessageT>
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+ {
+ folly::SpinLockGuard g(queue_->spinlock_);
+ if (queue_->draining_) {
+ return false;
+ }
+ queue_->draining_ = true;
+ }
+ consumeMessages(true);
+ {
+ folly::SpinLockGuard g(queue_->spinlock_);
+ queue_->draining_ = false;
+ }
+ return true;
+}
+
} // folly