CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
- std::unique_ptr<BlockingQueue<Task>> taskQueue,
+ std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::unique_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor(numThreads, std::move(threadFactory)),
taskQueue_(std::move(taskQueue)) {
void CPUThreadPoolExecutor::add(Func func) {
// TODO handle enqueue failure, here and in other add() callsites
- taskQueue_->add(Task(std::move(func)));
+ taskQueue_->add(CPUTask(std::move(func)));
}
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
while (1) {
// TODO expiration / codel
- auto t = taskQueue_->take();
- if (UNLIKELY(t.poison)) {
+ auto task = taskQueue_->take();
+ if (UNLIKELY(task.poison)) {
CHECK(threadsToStop_-- > 0);
stoppedThreads_.add(thread);
return;
} else {
- thread->idle = false;
- try {
- t.func();
- } catch (const std::exception& e) {
- LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " <<
- typeid(e).name() << " exception: " << e.what();
- } catch (...) {
- LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled "
- "non-exception object";
- }
- thread->idle = true;
+ runTask(thread, std::move(task));
}
if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
CHECK(stoppedThreads_.size() == 0);
threadsToStop_ = n;
for (int i = 0; i < n; i++) {
- taskQueue_->add(Task());
+ taskQueue_->add(CPUTask());
}
}
+uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
+ return taskQueue_->size();
+}
+
}} // folly::wangle
class CPUThreadPoolExecutor : public ThreadPoolExecutor {
public:
- struct Task;
+ struct CPUTask;
- // TODO thread naming, perhaps a required input to ThreadFactories
explicit CPUThreadPoolExecutor(
size_t numThreads,
- std::unique_ptr<BlockingQueue<Task>> taskQueue =
- folly::make_unique<LifoSemMPMCQueue<Task>>(
+ std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
+ folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
CPUThreadPoolExecutor::kDefaultMaxQueueSize),
std::unique_ptr<ThreadFactory> threadFactory =
folly::make_unique<NamedThreadFactory>("CPUThreadPool"));
void add(Func func) override;
- struct Task {
- explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {}
- Task() : func(nullptr), poison(true) {}
- Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {}
- Task(const Task&) = default;
- Task& operator=(const Task&) = default;
- Func func;
+ struct CPUTask : public ThreadPoolExecutor::Task {
+ // Must be noexcept move constructible so it can be used in MPMCQueue
+ explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {}
+ CPUTask() : Task(nullptr), poison(true) {}
+ CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {}
+ CPUTask(const CPUTask&) = default;
+ CPUTask& operator=(const CPUTask&) = default;
bool poison;
- // TODO per-task stats, timeouts, expirations
};
static const size_t kDefaultMaxQueueSize;
private:
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
+ uint64_t getPendingTaskCount() override;
- std::unique_ptr<BlockingQueue<Task>> taskQueue_;
+ std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
std::atomic<ssize_t> threadsToStop_{0};
};
typedef std::function<void()> Func;
+namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h
+
class Executor {
public:
virtual ~Executor() {};
virtual void add(Func func) = 0;
};
+}
+
}} // folly::wangle
auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
auto ioThread = std::static_pointer_cast<IOThread>(thread);
- auto moveFunc = folly::makeMoveWrapper(std::move(func));
- auto wrappedFunc = [moveFunc, ioThread] () {
- (*moveFunc)();
- ioThread->outstandingTasks--;
+ auto moveTask = folly::makeMoveWrapper(Task(std::move(func)));
+ auto wrappedFunc = [this, ioThread, moveTask] () mutable {
+ runTask(ioThread, std::move(*moveTask));
+ ioThread->pendingTasks--;
};
- ioThread->outstandingTasks++;
+ ioThread->pendingTasks++;
if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
- ioThread->outstandingTasks--;
+ ioThread->pendingTasks--;
throw std::runtime_error("Unable to run func in event base thread");
}
}
ioThread->eventBase.loopForever();
}
if (isJoin_) {
- while (ioThread->outstandingTasks > 0) {
+ while (ioThread->pendingTasks > 0) {
ioThread->eventBase.loopOnce();
}
}
stoppedThreads_.add(ioThread);
}
+// threadListLock_ is writelocked
void IOThreadPoolExecutor::stopThreads(size_t n) {
for (int i = 0; i < n; i++) {
const auto ioThread = std::static_pointer_cast<IOThread>(
}
}
+// threadListLock_ is readlocked
+uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
+ uint64_t count = 0;
+ for (const auto& thread : threadList_.get()) {
+ auto ioThread = std::static_pointer_cast<IOThread>(thread);
+ size_t pendingTasks = ioThread->pendingTasks;
+ if (pendingTasks > 0 && !ioThread->idle) {
+ pendingTasks--;
+ }
+ count += pendingTasks;
+ }
+ return count;
+}
+
}} // folly::wangle
ThreadPtr makeThread() override;
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
+ uint64_t getPendingTaskCount() override;
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
- IOThread() : shouldRun(true), outstandingTasks(0) {};
+ IOThread() : shouldRun(true), pendingTasks(0) {};
std::atomic<bool> shouldRun;
- std::atomic<size_t> outstandingTasks;
+ std::atomic<size_t> pendingTasks;
EventBase eventBase;
};
CHECK(threadList_.get().size() == 0);
}
+void ThreadPoolExecutor::runTask(
+ const ThreadPtr& thread,
+ Task&& task) {
+ thread->idle = false;
+ task.started();
+ try {
+ task.func();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
+ typeid(e).name() << " exception: " << e.what();
+ } catch (...) {
+ LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
+ "object";
+ }
+ task.completed();
+ taskStatsSubject_.onNext(std::move(task.stats));
+ thread->idle = true;
+}
+
size_t ThreadPoolExecutor::numThreads() {
RWSpinLock::ReadHolder{&threadListLock_};
return threadList_.get().size();
CHECK(threadList_.get().size() == n);
}
+// threadListLock_ is writelocked
void ThreadPoolExecutor::addThreads(size_t n) {
for (int i = 0; i < n; i++) {
auto thread = makeThread();
}
}
+// threadListLock_ is writelocked
void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
CHECK(n <= threadList_.get().size());
CHECK(stoppedThreads_.size() == 0);
CHECK(threadList_.get().size() == 0);
}
+ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
+ RWSpinLock::ReadHolder{&threadListLock_};
+ ThreadPoolExecutor::PoolStats stats;
+ stats.threadCount = threadList_.get().size();
+ for (auto thread : threadList_.get()) {
+ if (thread->idle) {
+ stats.idleThreadCount++;
+ } else {
+ stats.activeThreadCount++;
+ }
+ }
+ stats.pendingTaskCount = getPendingTaskCount();
+ stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount;
+ return stats;
+}
+
std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
void ThreadPoolExecutor::StoppedThreadQueue::add(
#include <folly/experimental/wangle/concurrent/Executor.h>
#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/experimental/wangle/rx/Observable.h>
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
namespace folly { namespace wangle {
-class ThreadPoolExecutor : public Executor {
+class ThreadPoolExecutor : public experimental::Executor {
public:
explicit ThreadPoolExecutor(
size_t numThreads,
void setNumThreads(size_t numThreads);
void stop();
void join();
- // TODO expose stats
+
+ struct PoolStats {
+ PoolStats() : threadCount(0), idleThreadCount(0), activeThreadCount(0),
+ pendingTaskCount(0), totalTaskCount(0) {}
+ size_t threadCount, idleThreadCount, activeThreadCount;
+ uint64_t pendingTaskCount, totalTaskCount;
+ };
+
+ PoolStats getPoolStats();
+
+ struct TaskStats {
+ TaskStats() : expired(false), waitTime(0), runTime(0) {}
+ bool expired;
+ std::chrono::microseconds waitTime;
+ std::chrono::microseconds runTime;
+ };
+
+ Subscription subscribeToTaskStats(
+ const ObserverPtr<TaskStats>& observer) {
+ return taskStatsSubject_.subscribe(observer);
+ }
protected:
+ // Prerequisite: threadListLock_ writelocked
void addThreads(size_t n);
+ // Prerequisite: threadListLock_ writelocked
void removeThreads(size_t n, bool isJoin);
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
uint64_t id;
std::thread handle;
bool idle;
- // TODO per-thread stats go here
};
typedef std::shared_ptr<Thread> ThreadPtr;
+ struct Task {
+ explicit Task(Func&& f) : func(std::move(f)) {
+ // Assume that the task in enqueued on creation
+ intervalBegin = std::chrono::steady_clock::now();
+ }
+
+ Func func;
+ TaskStats stats;
+ // TODO per-task timeouts, expirations
+
+ void started() {
+ auto now = std::chrono::steady_clock::now();
+ stats.waitTime = std::chrono::duration_cast<std::chrono::microseconds>(
+ now - intervalBegin);
+ intervalBegin = now;
+ }
+ void completed() {
+ stats.runTime = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now() - intervalBegin);
+ }
+
+ std::chrono::steady_clock::time_point intervalBegin;
+ };
+
+ void runTask(const ThreadPtr& thread, Task&& task);
+
// The function that will be bound to pool threads
virtual void threadRun(ThreadPtr thread) = 0;
- // Stop n threads and put their Thread structs in the threadsStopped_ queue
+
+ // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue
+ // Prerequisite: threadListLock_ writelocked
virtual void stopThreads(size_t n) = 0;
+
// Create a suitable Thread struct
virtual ThreadPtr makeThread() {
return std::make_shared<Thread>();
}
- // need a stopThread(id) for keepalive feature
+
+ // Prerequisite: threadListLock_ readlocked
+ virtual uint64_t getPendingTaskCount() = 0;
class ThreadList {
public:
RWSpinLock threadListLock_;
StoppedThreadQueue stoppedThreads_;
std::atomic<bool> isJoin_; // whether the current downsizing is a join
+
+ Subject<TaskStats> taskStatsSubject_;
};
}} // folly::wangle
using namespace folly::wangle;
+static Func burnMs(uint64_t ms) {
+ return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); };
+}
+
template <class TPE>
static void basic() {
// Create and destroy
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
resizeUnderLoad<IOThreadPoolExecutor>();
}
+
+template <class TPE>
+static void poolStats() {
+ {
+ TPE tpe(10);
+ for (int i = 0; i < 20; i++) {
+ tpe.add(burnMs(20));
+ }
+ burnMs(10)();
+ auto stats = tpe.getPoolStats();
+ EXPECT_EQ(10, stats.threadCount);
+ EXPECT_EQ(0, stats.idleThreadCount);
+ EXPECT_EQ(10, stats.activeThreadCount);
+ EXPECT_EQ(10, stats.pendingTaskCount);
+ EXPECT_EQ(20, stats.totalTaskCount);
+ }
+
+ {
+ TPE tpe(10);
+ for (int i = 0; i < 5; i++) {
+ tpe.add(burnMs(20));
+ }
+ burnMs(10)();
+ auto stats = tpe.getPoolStats();
+ EXPECT_EQ(10, stats.threadCount);
+ EXPECT_EQ(5, stats.idleThreadCount);
+ EXPECT_EQ(5, stats.activeThreadCount);
+ EXPECT_EQ(0, stats.pendingTaskCount);
+ EXPECT_EQ(5, stats.totalTaskCount);
+ }
+}
+
+TEST(ThreadPoolExecutorTest, CPUPoolStats) {
+ poolStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOPoolStats) {
+ poolStats<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void taskStats() {
+ TPE tpe(10);
+ std::atomic<int> c(0);
+ tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
+ [&] (ThreadPoolExecutor::TaskStats stats) {
+ int i = c++;
+ if (i < 10) {
+ EXPECT_GE(10000, stats.waitTime.count());
+ EXPECT_LE(20000, stats.runTime.count());
+ } else {
+ EXPECT_LE(10000, stats.waitTime.count());
+ EXPECT_LE(10000, stats.runTime.count());
+ }
+ }));
+ for (int i = 0; i < 10; i++) {
+ tpe.add(burnMs(20));
+ }
+ for (int i = 0; i < 10; i++) {
+ tpe.add(burnMs(10));
+ }
+ tpe.join();
+ EXPECT_EQ(20, c);
+}
+
+TEST(ThreadPoolExecutorTest, CPUTaskStats) {
+ taskStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOTaskStats) {
+ taskStats<IOThreadPoolExecutor>();
+}
#pragma once
#include <folly/ExceptionWrapper.h>
+#include <folly/wangle/Executor.h>
namespace folly { namespace wangle {
typedef folly::exception_wrapper Error;