user-defined expirations
[folly.git] / folly / experimental / wangle / concurrent / IOThreadPoolExecutor.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
18
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
21
22 namespace folly { namespace wangle {
23
24 IOThreadPoolExecutor::IOThreadPoolExecutor(
25     size_t numThreads,
26     std::unique_ptr<ThreadFactory> threadFactory)
27   : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
28     nextThread_(0) {
29   addThreads(numThreads);
30   CHECK(threadList_.get().size() == numThreads);
31 }
32
33 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
34   stop();
35 }
36
37 void IOThreadPoolExecutor::add(Func func) {
38   add(std::move(func), std::chrono::milliseconds(0));
39 }
40
41 void IOThreadPoolExecutor::add(
42     Func func,
43     std::chrono::milliseconds expiration,
44     Func expireCallback) {
45   RWSpinLock::ReadHolder{&threadListLock_};
46   if (threadList_.get().empty()) {
47     throw std::runtime_error("No threads available");
48   }
49   auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
50   auto ioThread = std::static_pointer_cast<IOThread>(thread);
51
52   auto moveTask = folly::makeMoveWrapper(
53       Task(std::move(func), expiration, std::move(expireCallback)));
54   auto wrappedFunc = [this, ioThread, moveTask] () mutable {
55     runTask(ioThread, std::move(*moveTask));
56     ioThread->pendingTasks--;
57   };
58
59   ioThread->pendingTasks++;
60   if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
61     ioThread->pendingTasks--;
62     throw std::runtime_error("Unable to run func in event base thread");
63   }
64 }
65
66 std::shared_ptr<ThreadPoolExecutor::Thread>
67 IOThreadPoolExecutor::makeThread() {
68   return std::make_shared<IOThread>();
69 }
70
71 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
72   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
73   while (ioThread->shouldRun) {
74     ioThread->eventBase.loopForever();
75   }
76   if (isJoin_) {
77     while (ioThread->pendingTasks > 0) {
78       ioThread->eventBase.loopOnce();
79     }
80   }
81   stoppedThreads_.add(ioThread);
82 }
83
84 // threadListLock_ is writelocked
85 void IOThreadPoolExecutor::stopThreads(size_t n) {
86   for (int i = 0; i < n; i++) {
87     const auto ioThread = std::static_pointer_cast<IOThread>(
88         threadList_.get()[i]);
89     ioThread->shouldRun = false;
90     ioThread->eventBase.terminateLoopSoon();
91   }
92 }
93
94 // threadListLock_ is readlocked
95 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
96   uint64_t count = 0;
97   for (const auto& thread : threadList_.get()) {
98     auto ioThread = std::static_pointer_cast<IOThread>(thread);
99     size_t pendingTasks = ioThread->pendingTasks;
100     if (pendingTasks > 0 && !ioThread->idle) {
101       pendingTasks--;
102     }
103     count += pendingTasks;
104   }
105   return count;
106 }
107
108 }} // folly::wangle