From: James Sedgwick Date: Thu, 14 May 2015 00:45:10 +0000 (-0700) Subject: via with priority X-Git-Tag: v0.39.0~15 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=c8250894a847994e9f76960564a6a56d68b85f5c;p=folly.git via with priority Summary: I wish I could just have an add(Func, priority) but the damned overloaded virtual warnings become a nightmare, so it's addWithPriority. I also switched priority to a uint8_t in the hopes of reducing Core size. Turns out std::atomic is 8 bytes anyways :( I left it that way because come on you really shouldn't be using > 256 priorities. Biggest problem is the data race with the two atomics executor_ and priority_. Should we just use a microspinlock to co-protect them? Could probably save some size from the atomics that way. Test Plan: unit Reviewed By: hans@fb.com Subscribers: hannesr, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2039619 Tasks: 6928162 Signature: t1:2039619:1431551266:3b31ed2329301aaa9c32f0f41b6e61f3482d570e --- diff --git a/folly/Executor.h b/folly/Executor.h index 175a7b09..c7900459 100644 --- a/folly/Executor.h +++ b/folly/Executor.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include namespace folly { @@ -33,6 +34,21 @@ class Executor { /// variants must be threadsafe. virtual void add(Func) = 0; + /// Enqueue a function with a given priority, where 0 is the medium priority + /// This is up to the implementation to enforce + virtual void addWithPriority(Func, int8_t priority) { + throw std::runtime_error( + "addWithPriority() is not implemented for this Executor"); + } + + virtual uint8_t getNumPriorities() const { + return 1; + } + + static const int8_t LO_PRI = SCHAR_MIN; + static const int8_t MID_PRI = 0; + static const int8_t HI_PRI = SCHAR_MAX; + /// A convenience function for shared_ptr to legacy functors. /// /// Sometimes you have a functor that is move-only, and therefore can't be diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 003c9c89..28cb216f 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -422,22 +422,22 @@ Optional> Future::poll() { } template -inline Future Future::via(Executor* executor) && { +inline Future Future::via(Executor* executor, int8_t priority) && { throwIfInvalid(); - setExecutor(executor); + setExecutor(executor, priority); return std::move(*this); } template -inline Future Future::via(Executor* executor) & { +inline Future Future::via(Executor* executor, int8_t priority) & { throwIfInvalid(); MoveWrapper> p; auto f = p->getFuture(); then([p](Try&& t) mutable { p->setTry(std::move(t)); }); - return std::move(f).via(executor); + return std::move(f).via(executor, priority); } template @@ -526,8 +526,8 @@ inline Future makeFuture(Try&& t) { } // via -inline Future via(Executor* executor) { - return makeFuture().via(executor); +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } // mapSetCallback calls func(i, Try) when every future completes diff --git a/folly/futures/Future.h b/folly/futures/Future.h index d0824aa8..ccea88b3 100644 --- a/folly/futures/Future.h +++ b/folly/futures/Future.h @@ -97,12 +97,16 @@ class Future { // The ref-qualifier allows for `this` to be moved out so we // don't get access-after-free situations in chaining. // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/ - inline Future via(Executor* executor) &&; + inline Future via( + Executor* executor, + int8_t priority = Executor::MID_PRI) &&; /// This variant creates a new future, where the ref-qualifier && version /// moves `this` out. This one is less efficient but avoids confusing users /// when "return f.via(x);" fails. - inline Future via(Executor* executor) &; + inline Future via( + Executor* executor, + int8_t priority = Executor::MID_PRI) &; /** True when the result (or exception) is ready. */ bool isReady() const; @@ -405,7 +409,9 @@ class Future { thenImplementation(F func, detail::argResult); Executor* getExecutor() { return core_->getExecutor(); } - void setExecutor(Executor* x) { core_->setExecutor(x); } + void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) { + core_->setExecutor(x, priority); + } }; } // folly diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 65e2cb1d..3729b622 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -227,17 +227,20 @@ class Core { bool isActive() { return active_; } /// Call only from Future thread - void setExecutor(Executor* x) { + void setExecutor(Executor* x, int8_t priority) { + folly::MSLGuard g(executorLock_); executor_ = x; + priority_ = priority; } Executor* getExecutor() { + folly::MSLGuard g(executorLock_); return executor_; } /// Call only from Future thread void raise(exception_wrapper e) { - std::lock_guard guard(interruptLock_); + folly::MSLGuard guard(interruptLock_); if (!interrupt_ && !hasResult()) { interrupt_ = folly::make_unique(std::move(e)); if (interruptHandler_) { @@ -248,7 +251,7 @@ class Core { /// Call only from Promise thread void setInterruptHandler(std::function fn) { - std::lock_guard guard(interruptLock_); + folly::MSLGuard guard(interruptLock_); if (!hasResult()) { if (interrupt_) { fn(*interrupt_); @@ -277,14 +280,28 @@ class Core { RequestContext::setContext(context_); // TODO(6115514) semantic race on reading executor_ and setExecutor() - Executor* x = executor_; + Executor* x; + int8_t priority; + { + folly::MSLGuard g(executorLock_); + x = executor_; + priority = priority_; + } + if (x) { ++attached_; // keep Core alive until executor did its thing try { - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; - callback_(std::move(*result_)); - }); + if (LIKELY(x->getNumPriorities() == 1)) { + x->add([this]() mutable { + SCOPE_EXIT { detachOne(); }; + callback_(std::move(*result_)); + }); + } else { + x->addWithPriority([this]() mutable { + SCOPE_EXIT { detachOne(); }; + callback_(std::move(*result_)); + }, priority); + } } catch (...) { result_ = Try(exception_wrapper(std::current_exception())); callback_(std::move(*result_)); @@ -307,12 +324,14 @@ class Core { std::atomic attached_ {2}; std::atomic active_ {true}; folly::MicroSpinLock interruptLock_ {0}; + folly::MicroSpinLock executorLock_ {0}; + int8_t priority_ {-1}; + Executor* executor_ {nullptr}; folly::Optional> result_ {}; std::function&&)> callback_ {nullptr}; static constexpr size_t lambdaBufSize = 8 * sizeof(void*); char lambdaBuf_[lambdaBufSize]; std::shared_ptr context_ {nullptr}; - std::atomic executor_ {nullptr}; std::unique_ptr interrupt_ {}; std::function interruptHandler_ {nullptr}; }; diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 744a01ac..c7456411 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -128,10 +128,14 @@ Future makeFuture(Try&& t); * This is just syntactic sugar for makeFuture().via(executor) * * @param executor the Executor to call back on + * @param priority optionally, the priority to add with. Defaults to 0 which + * represents medium priority. * * @returns a void Future that will call back on the given executor */ -inline Future via(Executor* executor); +inline Future via( + Executor* executor, + int8_t priority = Executor::MID_PRI); /** When all the input Futures complete, the returned Future will complete. Errors do not cause early termination; this Future will always succeed diff --git a/folly/futures/test/ViaTest.cpp b/folly/futures/test/ViaTest.cpp index 3ade6679..061dc4c4 100644 --- a/folly/futures/test/ViaTest.cpp +++ b/folly/futures/test/ViaTest.cpp @@ -185,6 +185,49 @@ TEST(Via, chain3) { EXPECT_EQ(3, count); } +struct PriorityExecutor : public Executor { + void add(Func f) override {} + + void addWithPriority(Func, int8_t priority) override { + int mid = getNumPriorities() / 2; + int p = priority < 0 ? + std::max(0, mid + priority) : + std::min(getNumPriorities() - 1, mid + priority); + EXPECT_LT(p, 3); + EXPECT_GE(p, 0); + if (p == 0) { + count0++; + } else if (p == 1) { + count1++; + } else if (p == 2) { + count2++; + } + } + + uint8_t getNumPriorities() const override { + return 3; + } + + int count0{0}; + int count1{0}; + int count2{0}; +}; + +TEST(Via, priority) { + PriorityExecutor exe; + via(&exe, -1).then([]{}); + via(&exe, 0).then([]{}); + via(&exe, 1).then([]{}); + via(&exe, 42).then([]{}); // overflow should go to max priority + via(&exe, -42).then([]{}); // underflow should go to min priority + via(&exe).then([]{}); // default to mid priority + via(&exe, Executor::LO_PRI).then([]{}); + via(&exe, Executor::HI_PRI).then([]{}); + EXPECT_EQ(3, exe.count0); + EXPECT_EQ(2, exe.count1); + EXPECT_EQ(3, exe.count2); +} + TEST(Via, then2) { ManualExecutor x1, x2; bool a = false, b = false, c = false; diff --git a/folly/wangle/concurrent/BlockingQueue.h b/folly/wangle/concurrent/BlockingQueue.h index 14d9c6a5..ebfdc18d 100644 --- a/folly/wangle/concurrent/BlockingQueue.h +++ b/folly/wangle/concurrent/BlockingQueue.h @@ -25,14 +25,12 @@ class BlockingQueue { public: virtual ~BlockingQueue() {} virtual void add(T item) = 0; - virtual void addWithPriority(T item, uint32_t priority) { + virtual void addWithPriority(T item, int8_t priority) { LOG_FIRST_N(WARNING, 1) << "add(item, priority) called on a non-priority queue"; add(std::move(item)); } - virtual uint32_t getNumPriorities() { - LOG_FIRST_N(WARNING, 1) << - "getNumPriorities() called on a non-priority queue"; + virtual uint8_t getNumPriorities() { return 1; } virtual T take() = 0; diff --git a/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp index fcc835cd..864bd3a1 100644 --- a/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -20,7 +20,6 @@ namespace folly { namespace wangle { const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14; -const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2; CPUThreadPoolExecutor::CPUThreadPoolExecutor( size_t numThreads, @@ -48,7 +47,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads) CPUThreadPoolExecutor::CPUThreadPoolExecutor( size_t numThreads, - uint32_t numPriorities, + int8_t numPriorities, std::shared_ptr threadFactory) : CPUThreadPoolExecutor( numThreads, @@ -59,7 +58,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor( size_t numThreads, - uint32_t numPriorities, + int8_t numPriorities, size_t maxQueueSize, std::shared_ptr threadFactory) : CPUThreadPoolExecutor( @@ -87,22 +86,22 @@ void CPUThreadPoolExecutor::add( CPUTask(std::move(func), expiration, std::move(expireCallback))); } -void CPUThreadPoolExecutor::add(Func func, uint32_t priority) { +void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) { add(std::move(func), priority, std::chrono::milliseconds(0)); } void CPUThreadPoolExecutor::add( Func func, - uint32_t priority, + int8_t priority, std::chrono::milliseconds expiration, Func expireCallback) { - CHECK(priority < getNumPriorities()); + CHECK(getNumPriorities() > 0); taskQueue_->addWithPriority( CPUTask(std::move(func), expiration, std::move(expireCallback)), priority); } -uint32_t CPUThreadPoolExecutor::getNumPriorities() const { +uint8_t CPUThreadPoolExecutor::getNumPriorities() const { return taskQueue_->getNumPriorities(); } @@ -142,7 +141,7 @@ void CPUThreadPoolExecutor::stopThreads(size_t n) { CHECK(stoppedThreads_.size() == 0); threadsToStop_ = n; for (size_t i = 0; i < n; i++) { - taskQueue_->add(CPUTask()); + taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI); } } diff --git a/folly/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/wangle/concurrent/CPUThreadPoolExecutor.h index 56833e22..7b85ae1f 100644 --- a/folly/wangle/concurrent/CPUThreadPoolExecutor.h +++ b/folly/wangle/concurrent/CPUThreadPoolExecutor.h @@ -24,27 +24,26 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { public: struct CPUTask; - explicit CPUThreadPoolExecutor( + CPUThreadPoolExecutor( size_t numThreads, std::unique_ptr> taskQueue, std::shared_ptr threadFactory = std::make_shared("CPUThreadPool")); explicit CPUThreadPoolExecutor(size_t numThreads); - - explicit CPUThreadPoolExecutor( +CPUThreadPoolExecutor( size_t numThreads, std::shared_ptr threadFactory); - explicit CPUThreadPoolExecutor( + CPUThreadPoolExecutor( size_t numThreads, - uint32_t numPriorities, + int8_t numPriorities, std::shared_ptr threadFactory = std::make_shared("CPUThreadPool")); - explicit CPUThreadPoolExecutor( + CPUThreadPoolExecutor( size_t numThreads, - uint32_t numPriorities, + int8_t numPriorities, size_t maxQueueSize, std::shared_ptr threadFactory = std::make_shared("CPUThreadPool")); @@ -57,14 +56,14 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { std::chrono::milliseconds expiration, Func expireCallback = nullptr) override; - void add(Func func, uint32_t priority); + void addWithPriority(Func func, int8_t priority) override; void add( Func func, - uint32_t priority, + int8_t priority, std::chrono::milliseconds expiration, Func expireCallback = nullptr); - uint32_t getNumPriorities() const; + uint8_t getNumPriorities() const override; struct CPUTask : public ThreadPoolExecutor::Task { // Must be noexcept move constructible so it can be used in MPMCQueue @@ -84,7 +83,6 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { }; static const size_t kDefaultMaxQueueSize; - static const size_t kDefaultNumPriorities; protected: BlockingQueue* getTaskQueue(); diff --git a/folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h b/folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h index 0e484715..583a9a34 100644 --- a/folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h +++ b/folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h @@ -24,26 +24,29 @@ namespace folly { namespace wangle { template class PriorityLifoSemMPMCQueue : public BlockingQueue { public: - explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) { - CHECK(numPriorities > 0); + explicit PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t capacity) { queues_.reserve(numPriorities); - for (uint32_t i = 0; i < numPriorities; i++) { + for (int8_t i = 0; i < numPriorities; i++) { queues_.push_back(MPMCQueue(capacity)); } } - uint32_t getNumPriorities() override { + uint8_t getNumPriorities() override { return queues_.size(); } - // Add at lowest priority by default + // Add at medium priority by default void add(T item) override { - addWithPriority(std::move(item), 0); + addWithPriority(std::move(item), Executor::MID_PRI); } - void addWithPriority(T item, uint32_t priority) override { - CHECK(priority < queues_.size()); - if (!queues_[priority].write(std::move(item))) { + void addWithPriority(T item, int8_t priority) override { + int mid = getNumPriorities() / 2; + size_t queue = priority < 0 ? + std::max(0, mid + priority) : + std::min(getNumPriorities() - 1, mid + priority); + CHECK(queue < queues_.size()); + if (!queues_[queue].write(std::move(item))) { throw std::runtime_error("LifoSemMPMCQueue full, can't add item"); } sem_.post(); diff --git a/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index d3fca8c4..8a6fcc02 100644 --- a/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -310,10 +310,10 @@ TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) { }; CPUThreadPoolExecutor pool(0, 2); for (int i = 0; i < 50; i++) { - pool.add(lopri, 0); + pool.addWithPriority(lopri, Executor::LO_PRI); } for (int i = 0; i < 50; i++) { - pool.add(hipri, 1); + pool.addWithPriority(hipri, Executor::HI_PRI); } pool.setNumThreads(1); pool.join(); @@ -372,3 +372,24 @@ TEST(ThreadPoolExecutorTest, CPUObserver) { observer->checkCalls(); } + +TEST(ThreadPoolExecutorTest, AddWithPriority) { + std::atomic_int c{0}; + auto f = [&]{ c++; }; + + // IO exe doesn't support priorities + IOThreadPoolExecutor ioExe(10); + EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error); + + CPUThreadPoolExecutor cpuExe(10, 3); + cpuExe.addWithPriority(f, -1); + cpuExe.addWithPriority(f, 0); + cpuExe.addWithPriority(f, 1); + cpuExe.addWithPriority(f, -2); // will add at the lowest priority + cpuExe.addWithPriority(f, 2); // will add at the highest priority + cpuExe.addWithPriority(f, Executor::LO_PRI); + cpuExe.addWithPriority(f, Executor::HI_PRI); + cpuExe.join(); + + EXPECT_EQ(7, c); +}