#include <folly/Exception.h>
#include <folly/FileUtil.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/EventHandler.h>
-#include <folly/io/async/DelayedDestruction.h>
-#include <folly/io/async/Request.h>
#include <folly/Likely.h>
#include <folly/ScopeGuard.h>
#include <folly/SpinLock.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/Request.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/Unistd.h>
* spinning trying to move a message off the queue and failing, and then
* retrying.
*/
-template<typename MessageT>
+template <typename MessageT>
class NotificationQueue {
public:
/**
* may throw any other exception thrown by the MessageT move/copy
* constructor.
*/
- void tryPutMessage(MessageT&& message) {
- putMessageImpl(std::move(message), advisoryMaxQueueSize_);
- }
- void tryPutMessage(const MessageT& message) {
- putMessageImpl(message, advisoryMaxQueueSize_);
+ template <typename MessageTT>
+ void tryPutMessage(MessageTT&& message) {
+ putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
}
/**
* (which indicates that the queue is being drained) are prevented from being
* thrown. User code must still catch std::bad_alloc errors.
*/
- bool tryPutMessageNoThrow(MessageT&& message) {
- return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
- }
- bool tryPutMessageNoThrow(const MessageT& message) {
- return putMessageImpl(message, advisoryMaxQueueSize_, false);
+ template <typename MessageTT>
+ bool tryPutMessageNoThrow(MessageTT&& message) {
+ return putMessageImpl(
+ std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
}
/**
* - std::runtime_error if the queue is currently draining
* - any other exception thrown by the MessageT move/copy constructor.
*/
- void putMessage(MessageT&& message) {
- putMessageImpl(std::move(message), 0);
- }
- void putMessage(const MessageT& message) {
- putMessageImpl(message, 0);
+ template <typename MessageTT>
+ void putMessage(MessageTT&& message) {
+ putMessageImpl(std::forward<MessageTT>(message), 0);
}
/**
* Put several messages on the queue.
*/
- template<typename InputIteratorT>
+ template <typename InputIteratorT>
void putMessages(InputIteratorT first, InputIteratorT last) {
typedef typename std::iterator_traits<InputIteratorT>::iterator_category
IterCategory;
}
}
- bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
+ template <typename MessageTT>
+ bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
checkPid();
bool signal = false;
{
if (numActiveConsumers_ < numConsumers_) {
signal = true;
}
- queue_.emplace_back(std::move(message), RequestContext::saveContext());
+ queue_.emplace_back(
+ std::forward<MessageTT>(message), RequestContext::saveContext());
if (signal) {
ensureSignalLocked();
}
return true;
}
- bool putMessageImpl(
- const MessageT& message, size_t maxSize, bool throws=true) {
- checkPid();
- bool signal = false;
- {
- folly::SpinLockGuard g(spinlock_);
- if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
- return false;
- }
- if (numActiveConsumers_ < numConsumers_) {
- signal = true;
- }
- queue_.emplace_back(message, RequestContext::saveContext());
- if (signal) {
- ensureSignalLocked();
- }
- }
- return true;
- }
-
- template<typename InputIteratorT>
+ template <typename InputIteratorT>
void putMessagesImpl(InputIteratorT first, InputIteratorT last,
std::input_iterator_tag) {
checkPid();
bool draining_{false};
};
-template<typename MessageT>
+template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::destroy() {
// If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
// will be non-nullptr. Mark the value that it points to, so that
DelayedDestruction::destroy();
}
-template<typename MessageT>
+template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
noexcept {
consumeMessages(false);
}
-template<typename MessageT>
+template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::consumeMessages(
bool isDrain, size_t* numConsumed) noexcept {
DestructorGuard dg(this);
}
}
-template<typename MessageT>
+template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::init(
EventBase* eventBase,
NotificationQueue* queue) {
}
}
-template<typename MessageT>
+template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::stopConsuming() {
if (queue_ == nullptr) {
assert(!isHandlerRegistered());
queue_ = nullptr;
}
-template<typename MessageT>
+template <typename MessageT>
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
size_t* numConsumed) noexcept {
DestructorGuard dg(this);
std::forward<TCallback>(callback)));
}
-} // folly
+} // namespace folly