move wangle/concurrent to folly/executors
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 6 Sep 2017 06:09:06 +0000 (23:09 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 6 Sep 2017 06:20:17 +0000 (23:20 -0700)
Summary:
For the initial cutover, just pull the copies in folly into the wangle namespace
The main problem with that approach is that forward declarations of wangle components no longer work, so I fixed those manually

ALSO, IMPORTANT: This is a great, once in a lifetime opportunity to rename/restructure these components. I have a few ideas that I'll noodle and share eventually, e.g. changing LifoSemMPMCQueue so it's just descriptive and not an enumeration of implementation details. Please chip in with yr ideas!

Reviewed By: yfeldblum

Differential Revision: D5694213

fbshipit-source-id: 4fc0ea9359d1216191676fc9729fb53a3f06339f

39 files changed:
CMakeLists.txt
folly/Makefile.am
folly/Singleton.h
folly/docs/Executors.md [new file with mode: 0644]
folly/executors/Async.h [new file with mode: 0644]
folly/executors/BlockingQueue.h [new file with mode: 0644]
folly/executors/CPUThreadPoolExecutor.cpp [new file with mode: 0644]
folly/executors/CPUThreadPoolExecutor.h [new file with mode: 0644]
folly/executors/Codel.cpp [new file with mode: 0644]
folly/executors/Codel.h [new file with mode: 0644]
folly/executors/FiberIOExecutor.h [new file with mode: 0644]
folly/executors/FutureExecutor.h [new file with mode: 0644]
folly/executors/GlobalExecutor.cpp [new file with mode: 0644]
folly/executors/GlobalExecutor.h [new file with mode: 0644]
folly/executors/IOExecutor.h [new file with mode: 0644]
folly/executors/IOObjectCache.h [new file with mode: 0644]
folly/executors/IOThreadPoolExecutor.cpp [new file with mode: 0644]
folly/executors/IOThreadPoolExecutor.h [new file with mode: 0644]
folly/executors/LifoSemMPMCQueue.h [new file with mode: 0644]
folly/executors/NamedThreadFactory.h [new file with mode: 0644]
folly/executors/PriorityLifoSemMPMCQueue.h [new file with mode: 0644]
folly/executors/PriorityThreadFactory.h [new file with mode: 0644]
folly/executors/SerialExecutor.cpp [new file with mode: 0644]
folly/executors/SerialExecutor.h [new file with mode: 0644]
folly/executors/ThreadFactory.h [new file with mode: 0644]
folly/executors/ThreadPoolExecutor.cpp [new file with mode: 0644]
folly/executors/ThreadPoolExecutor.h [new file with mode: 0644]
folly/executors/ThreadedExecutor.cpp [new file with mode: 0644]
folly/executors/ThreadedExecutor.h [new file with mode: 0644]
folly/executors/UnboundedBlockingQueue.h [new file with mode: 0644]
folly/executors/test/AsyncTest.cpp [new file with mode: 0644]
folly/executors/test/CodelTest.cpp [new file with mode: 0644]
folly/executors/test/FiberIOExecutorTest.cpp [new file with mode: 0644]
folly/executors/test/GlobalExecutorTest.cpp [new file with mode: 0644]
folly/executors/test/SerialExecutorTest.cpp [new file with mode: 0644]
folly/executors/test/ThreadPoolExecutorTest.cpp [new file with mode: 0644]
folly/executors/test/ThreadedExecutorTest.cpp [new file with mode: 0644]
folly/executors/test/UnboundedBlockingQueueTest.cpp [new file with mode: 0644]
folly/futures/README.md

index c067fb7..6c72796 100755 (executable)
@@ -296,6 +296,15 @@ if (BUILD_TESTS)
   folly_define_tests(
     DIRECTORY concurrency/test/
       TEST cache_locality_test SOURCES CacheLocalityTest.cpp
+    DIRECTORY executors/test/
+      TEST async_test SOURCES AsyncTest.cpp
+      TEST codel_test SOURCES CodelTest.cpp
+      TEST fiber_io_executor_test SOURCES FiberIOExecutorTest.cpp
+      TEST global_executor_test SOURCES GlobalExecutorTest.cpp
+      TEST serial_executor_test SOURCES SerialExecutorTest.cpp
+      TEST thread_pool_executor_test SOURCES ThreadPoolExecutorTest.cpp
+      TEST threaded_executor_test SOURCES ThreadedExecutorTest.cpp
+      TEST unbounded_blocking_queue_test SOURCES UnboundedBlockingQueueTest.cpp
     DIRECTORY experimental/test/
       TEST autotimer_test SOURCES AutoTimerTest.cpp
       TEST bits_test_2 SOURCES BitsTest.cpp
index 442c1e1..cb6c107 100644 (file)
@@ -85,6 +85,25 @@ nobase_follyinclude_HEADERS = \
        detail/ThreadLocalDetail.h \
        detail/TurnSequencer.h \
        detail/UncaughtExceptionCounter.h \
+       executors/Async.h \
+       executors/BlockingQueue.h \
+       executors/CPUThreadPoolExecutor.h \
+       executors/Codel.h \
+       executors/FiberIOExecutor.h \
+       executors/FutureExecutor.h \
+       executors/GlobalExecutor.h \
+       executors/IOExecutor.h \
+       executors/IOObjectCache.h \
+       executors/IOThreadPoolExecutor.h \
+       executors/LifoSemMPMCQueue.h \
+       executors/NamedThreadFactory.h \
+       executors/PriorityLifoSemMPMCQueue.h \
+       executors/PriorityThreadFactory.h \
+       executors/SerialExecutor.h \
+       executors/ThreadFactory.h \
+       executors/ThreadPoolExecutor.h \
+       executors/ThreadedExecutor.h \
+       executors/UnboundedBlockingQueue.h \
        Demangle.h \
        DiscriminatedPtr.h \
        DynamicConverter.h \
@@ -487,6 +506,13 @@ libfolly_la_SOURCES = \
        futures/QueuedImmediateExecutor.cpp \
        futures/ThreadWheelTimekeeper.cpp \
        futures/test/TestExecutor.cpp \
+       executors/CPUThreadPoolExecutor.cpp \
+       executors/Codel.cpp \
+       executors/GlobalExecutor.cpp \
+       executors/IOThreadPoolExecutor.cpp \
+       executors/SerialExecutor.cpp \
+       executors/ThreadPoolExecutor.cpp \
+       executors/ThreadedExecutor.cpp \
        experimental/hazptr/hazptr.cpp \
        experimental/hazptr/memory_resource.cpp \
        GlobalThreadPoolList.cpp \
index 975bab5..028cf7f 100644 (file)
@@ -405,7 +405,7 @@ class SingletonVault {
    *
    * Sample usage:
    *
-   *   wangle::IOThreadPoolExecutor executor(max_concurrency_level);
+   *   folly::IOThreadPoolExecutor executor(max_concurrency_level);
    *   folly::Baton<> done;
    *   doEagerInitVia(executor, &done);
    *   done.wait();  // or 'timed_wait', or spin with 'try_wait'
diff --git a/folly/docs/Executors.md b/folly/docs/Executors.md
new file mode 100644 (file)
index 0000000..8491bcf
--- /dev/null
@@ -0,0 +1,63 @@
+<section class="dex_document"><h1>Thread pools &amp; Executors</h1><p class="dex_introduction">Run your concurrent code in a performant way</p><h2 id="all-about-thread-pools">All about thread pools <a href="#all-about-thread-pools" class="headerLink">#</a></h2>
+
+<h3 id="how-do-i-use-the-thread">How do I use the thread pools? <a href="#how-do-i-use-the-thread" class="headerLink">#</a></h3>
+
+<p>Wangle provides two concrete thread pools (IOThreadPoolExecutor, CPUThreadPoolExecutor) as well as building them in as part of a complete async framework.  Generally you might want to grab the global executor, and use it with a future, like this:</p>
+
+<div class="remarkup-code-block" data-code-lang="php"><pre class="remarkup-code"><span class="no">auto</span> <span class="no">f</span> <span class="o">=</span> <span class="nf" data-symbol-name="someFutureFunction">someFutureFunction</span><span class="o">().</span><span class="nf" data-symbol-name="via">via</span><span class="o">(</span><span class="nf" data-symbol-name="getCPUExecutor">getCPUExecutor</span><span class="o">()).</span><span class="nf" data-symbol-name="then">then</span><span class="o">(...)</span></pre></div>
+
+<p>Or maybe you need to construct a thrift/memcache client, and need an event base:</p>
+
+<div class="remarkup-code-block" data-code-lang="php"><pre class="remarkup-code"><span class="no">auto</span> <span class="no">f</span> <span class="o">=</span> <span class="nf" data-symbol-name="getClient">getClient</span><span class="o">(</span><span class="nf" data-symbol-name="getIOExecutor">getIOExecutor</span><span class="o">()-&gt;</span><span class="na" data-symbol-name="getEventBase">getEventBase</span><span class="o">())-&gt;</span><span class="na" data-symbol-name="callSomeFunction">callSomeFunction</span><span class="o">(</span><span class="no">args</span><span class="o">...)</span>
+         <span class="o">.</span><span class="nf" data-symbol-name="via">via</span><span class="o">(</span><span class="nf" data-symbol-name="getCPUExecutor">getCPUExecutor</span><span class="o">())</span>
+         <span class="o">.</span><span class="nf" data-symbol-name="then">then</span><span class="o">([](</span><span class="no">Result</span> <span class="no">r</span><span class="o">)&#123;</span> <span class="o">....</span> <span class="k">do</span> <span class="no">something</span> <span class="no">with</span> <span class="no">result</span><span class="o">&#125;);</span></pre></div>
+
+<h3 id="vs-c-11-s-std-launch">vs. C++11&#039;s std::launch <a href="#vs-c-11-s-std-launch" class="headerLink">#</a></h3>
+
+<p>The current C++11 std::launch only has two modes: async or deferred.  In a production system, neither is what you want:  async will launch a new thread for every launch without limit, while deferred will defer the work until it is needed lazily, but then do the work <strong>in the current thread synchronously</strong> when it is needed.</p>
+
+<p>Wangle&#039;s thread pools always launch work as soon as possible, have limits to the maximum number of tasks / threads allowed, so we will never use more threads than absolutely needed.  See implementation details below about each type of executor.</p>
+
+<h3 id="why-do-we-need-yet-anoth">Why do we need yet another set of thread pools? <a href="#why-do-we-need-yet-anoth" class="headerLink">#</a></h3>
+
+<p>Unfortunately none of the existing thread pools had every feature needed - things based on pipes are too slow.   Several older ones didn&#039;t support std::function.</p>
+
+<h3 id="why-do-we-need-several-d">Why do we need several different types of thread pools? <a href="#why-do-we-need-several-d" class="headerLink">#</a></h3>
+
+<p>If you want epoll support, you need an fd - event_fd is the latest notification hotness.   Unfortunately, an active fd triggers all the epoll loops it is in, leading to thundering herd - so if you want a fair queue (one queue total vs. one queue per worker thread), you need to use some kind of semaphore.  Unfortunately semaphores can&#039;t be put in epoll loops, so they are incompatible with IO.   Fortunately, you usually want to separate the IO and CPU bound work anyway to give stronger tail latency guarantees on IO.</p>
+
+<h3 id="iothreadpoolexecutor">IOThreadPoolExecutor <a href="#iothreadpoolexecutor" class="headerLink">#</a></h3>
+
+<ul>
+<li>Uses event_fd for notification, and waking an epoll loop.</li>
+<li>There is one queue (NotificationQueue specifically) per thread/epoll.</li>
+<li>If the thread is already running and not waiting on epoll, we don&#039;t make any additional syscalls to wake up the loop, just put the new task in the queue.</li>
+<li>If any thread has been waiting for more than a few seconds, its stack is madvised away.   Currently however tasks are scheduled round robin on the queues, so unless there is <strong>no</strong> work going on, this isn&#039;t very effective.</li>
+<li>::getEventBase() will return an EventBase you can schedule IO work on directly, chosen round-robin.</li>
+<li>Since there is one queue per thread, there is hardly any contention on the queues - so a simple spinlock around an std::deque is used for the tasks.  There is no max queue size.</li>
+<li>By default, there is one thread per core - it usually doesn&#039;t make sense to have more IO threads than this, assuming they don&#039;t block.</li>
+</ul>
+
+<h3 id="cputhreadpoolexecutor">CPUThreadPoolExecutor <a href="#cputhreadpoolexecutor" class="headerLink">#</a></h3>
+
+<ul>
+<li>A single queue backed by folly/LifoSem and folly/MPMC queue.  Since there is only a single queue, contention can be quite high, since all the worker threads and all the producer threads hit the same queue.  MPMC queue excels in this situation.  MPMC queue dictates a max queue size.</li>
+<li>LifoSem wakes up threads in Lifo order - i.e. there are only few threads as necessary running, and we always try to reuse the same few threads for better cache locality.</li>
+<li>Inactive threads have their stack madvised away.  This works quite well in combination with Lifosem - it almost doesn&#039;t matter if more threads than are necessary are specified at startup.</li>
+<li>stop() will finish all outstanding tasks at exit</li>
+<li>Supports priorities - priorities are implemented as multiple queues - each worker thread checks the highest priority queue first.  Threads themselves don&#039;t have priorities set, so a series of long running low priority tasks could still hog all the threads.  (at last check pthreads thread priorities didn&#039;t work very well)</li>
+</ul>
+
+<h3 id="threadpoolexecutor">ThreadPoolExecutor <a href="#threadpoolexecutor" class="headerLink">#</a></h3>
+
+<p>Base class that contains the thread startup/shutdown/stats logic, since this is pretty disjoint from how tasks are actually run</p>
+
+<h3 id="observers">Observers <a href="#observers" class="headerLink">#</a></h3>
+
+<p>An observer interface is provided to listen for thread start/stop events.  This is useful to create objects that should be one-per-thread, but also have them work correctly if threads are added/removed from the thread pool.</p>
+
+<h3 id="stats">Stats <a href="#stats" class="headerLink">#</a></h3>
+
+<p>PoolStats are provided to get task count, running time, waiting time, etc.</p>
+</section>
+
diff --git a/folly/executors/Async.h b/folly/executors/Async.h
new file mode 100644 (file)
index 0000000..a2a4eae
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/executors/GlobalExecutor.h>
+#include <folly/futures/Future.h>
+
+namespace folly {
+
+template <class F>
+auto async(F&& fn) {
+  return folly::via<F>(getCPUExecutor().get(), std::forward<F>(fn));
+}
+
+} // namespace folly
diff --git a/folly/executors/BlockingQueue.h b/folly/executors/BlockingQueue.h
new file mode 100644 (file)
index 0000000..4928095
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <exception>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+// Some queue implementations (for example, LifoSemMPMCQueue or
+// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and
+// non-blocking (THROW) behaviors.
+enum class QueueBehaviorIfFull { THROW, BLOCK };
+
+class QueueFullException : public std::runtime_error {
+  using std::runtime_error::runtime_error; // Inherit constructors.
+};
+
+template <class T>
+class BlockingQueue {
+ public:
+  virtual ~BlockingQueue() = default;
+  virtual void add(T item) = 0;
+  virtual void addWithPriority(T item, int8_t /* priority */) {
+    add(std::move(item));
+  }
+  virtual uint8_t getNumPriorities() {
+    return 1;
+  }
+  virtual T take() = 0;
+  virtual size_t size() = 0;
+};
+
+} // namespace folly
diff --git a/folly/executors/CPUThreadPoolExecutor.cpp b/folly/executors/CPUThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..37fd444
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/CPUThreadPoolExecutor.h>
+#include <folly/executors/PriorityLifoSemMPMCQueue.h>
+
+namespace folly {
+
+const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+      taskQueue_(std::move(taskQueue)) {
+  setNumThreads(numThreads);
+}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          std::make_unique<LifoSemMPMCQueue<CPUTask>>(
+              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+          std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    int8_t numPriorities,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+              numPriorities,
+              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+          std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    int8_t numPriorities,
+    size_t maxQueueSize,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+              numPriorities,
+              maxQueueSize),
+          std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
+  stop();
+  CHECK(threadsToStop_ == 0);
+}
+
+void CPUThreadPoolExecutor::add(Func func) {
+  add(std::move(func), std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+    Func func,
+    std::chrono::milliseconds expiration,
+    Func expireCallback) {
+  // TODO handle enqueue failure, here and in other add() callsites
+  taskQueue_->add(
+      CPUTask(std::move(func), expiration, std::move(expireCallback)));
+}
+
+void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
+  add(std::move(func), priority, std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+    Func func,
+    int8_t priority,
+    std::chrono::milliseconds expiration,
+    Func expireCallback) {
+  CHECK(getNumPriorities() > 0);
+  taskQueue_->addWithPriority(
+      CPUTask(std::move(func), expiration, std::move(expireCallback)),
+      priority);
+}
+
+uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
+  return taskQueue_->getNumPriorities();
+}
+
+BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
+CPUThreadPoolExecutor::getTaskQueue() {
+  return taskQueue_.get();
+}
+
+void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+  this->threadPoolHook_.registerThread();
+
+  thread->startupBaton.post();
+  while (1) {
+    auto task = taskQueue_->take();
+    if (UNLIKELY(task.poison)) {
+      CHECK(threadsToStop_-- > 0);
+      for (auto& o : observers_) {
+        o->threadStopped(thread.get());
+      }
+      folly::RWSpinLock::WriteHolder w{&threadListLock_};
+      threadList_.remove(thread);
+      stoppedThreads_.add(thread);
+      return;
+    } else {
+      runTask(thread, std::move(task));
+    }
+
+    if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
+      if (--threadsToStop_ >= 0) {
+        folly::RWSpinLock::WriteHolder w{&threadListLock_};
+        threadList_.remove(thread);
+        stoppedThreads_.add(thread);
+        return;
+      } else {
+        threadsToStop_++;
+      }
+    }
+  }
+}
+
+void CPUThreadPoolExecutor::stopThreads(size_t n) {
+  threadsToStop_ += n;
+  for (size_t i = 0; i < n; i++) {
+    taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
+  }
+}
+
+// threadListLock_ is readlocked
+uint64_t CPUThreadPoolExecutor::getPendingTaskCountImpl(
+    const folly::RWSpinLock::ReadHolder&) {
+  return taskQueue_->size();
+}
+
+} // namespace folly
diff --git a/folly/executors/CPUThreadPoolExecutor.h b/folly/executors/CPUThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..64f1b73
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/ThreadPoolExecutor.h>
+
+namespace folly {
+
+/**
+ * A Thread pool for CPU bound tasks.
+ *
+ * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
+ * Because of this contention can be quite high,
+ * since all the worker threads and all the producer threads hit
+ * the same queue. MPMC queue excels in this situation but dictates a max queue
+ * size.
+ *
+ * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
+ * tasks executing on a given thread pool schedule more tasks, deadlock is
+ * possible if the queue becomes full.  Deadlock is also possible if there is
+ * a circular dependency among multiple thread pools with blocking queues.
+ * To avoid this situation, use non-blocking queue(s), or schedule tasks only
+ * from threads not belonging to the given thread pool(s), or use
+ * folly::IOThreadPoolExecutor.
+ *
+ * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
+ * threads as necessary running, and we always try to reuse the same few threads
+ * for better cache locality.
+ * Inactive threads have their stack madvised away. This works quite well in
+ * combination with Lifosem - it almost doesn't matter if more threads than are
+ * necessary are specified at startup.
+ *
+ * @note stop() will finish all outstanding tasks at exit.
+ *
+ * @note Supports priorities - priorities are implemented as multiple queues -
+ * each worker thread checks the highest priority queue first. Threads
+ * themselves don't have priorities set, so a series of long running low
+ * priority tasks could still hog all the threads. (at last check pthreads
+ * thread priorities didn't work very well).
+ */
+class CPUThreadPoolExecutor : public ThreadPoolExecutor {
+ public:
+  struct CPUTask;
+
+  CPUThreadPoolExecutor(
+      size_t numThreads,
+      std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
+      std::shared_ptr<ThreadFactory> threadFactory =
+          std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+  explicit CPUThreadPoolExecutor(size_t numThreads);
+
+  CPUThreadPoolExecutor(
+      size_t numThreads,
+      std::shared_ptr<ThreadFactory> threadFactory);
+
+  CPUThreadPoolExecutor(
+      size_t numThreads,
+      int8_t numPriorities,
+      std::shared_ptr<ThreadFactory> threadFactory =
+          std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+  CPUThreadPoolExecutor(
+      size_t numThreads,
+      int8_t numPriorities,
+      size_t maxQueueSize,
+      std::shared_ptr<ThreadFactory> threadFactory =
+          std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+  ~CPUThreadPoolExecutor() override;
+
+  void add(Func func) override;
+  void add(
+      Func func,
+      std::chrono::milliseconds expiration,
+      Func expireCallback = nullptr) override;
+
+  void addWithPriority(Func func, int8_t priority) override;
+  void add(
+      Func func,
+      int8_t priority,
+      std::chrono::milliseconds expiration,
+      Func expireCallback = nullptr);
+
+  uint8_t getNumPriorities() const override;
+
+  struct CPUTask : public ThreadPoolExecutor::Task {
+    // Must be noexcept move constructible so it can be used in MPMCQueue
+
+    explicit CPUTask(
+        Func&& f,
+        std::chrono::milliseconds expiration,
+        Func&& expireCallback)
+        : Task(std::move(f), expiration, std::move(expireCallback)),
+          poison(false) {}
+    CPUTask()
+        : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
+
+    bool poison;
+  };
+
+  static const size_t kDefaultMaxQueueSize;
+
+ protected:
+  BlockingQueue<CPUTask>* getTaskQueue();
+
+ private:
+  void threadRun(ThreadPtr thread) override;
+  void stopThreads(size_t n) override;
+  uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
+
+  std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
+  std::atomic<ssize_t> threadsToStop_{0};
+};
+
+} // namespace folly
diff --git a/folly/executors/Codel.cpp b/folly/executors/Codel.cpp
new file mode 100644 (file)
index 0000000..c84b9be
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/Codel.h>
+
+#include <folly/portability/GFlags.h>
+#include <algorithm>
+
+DEFINE_int32(codel_interval, 100, "Codel default interval time in ms");
+DEFINE_int32(codel_target_delay, 5, "Target codel queueing delay in ms");
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace folly {
+
+Codel::Codel()
+    : codelMinDelay_(0),
+      codelIntervalTime_(std::chrono::steady_clock::now()),
+      codelResetDelay_(true),
+      overloaded_(false) {}
+
+bool Codel::overloaded(std::chrono::nanoseconds delay) {
+  bool ret = false;
+  auto now = std::chrono::steady_clock::now();
+
+  // Avoid another thread updating the value at the same time we are using it
+  // to calculate the overloaded state
+  auto minDelay = codelMinDelay_;
+
+  if (now > codelIntervalTime_ &&
+      // testing before exchanging is more cacheline-friendly
+      (!codelResetDelay_.load(std::memory_order_acquire) &&
+       !codelResetDelay_.exchange(true))) {
+    codelIntervalTime_ = now + getInterval();
+
+    if (minDelay > getTargetDelay()) {
+      overloaded_ = true;
+    } else {
+      overloaded_ = false;
+    }
+  }
+  // Care must be taken that only a single thread resets codelMinDelay_,
+  // and that it happens after the interval reset above
+  if (codelResetDelay_.load(std::memory_order_acquire) &&
+      codelResetDelay_.exchange(false)) {
+    codelMinDelay_ = delay;
+    // More than one request must come in during an interval before codel
+    // starts dropping requests
+    return false;
+  } else if (delay < codelMinDelay_) {
+    codelMinDelay_ = delay;
+  }
+
+  // Here is where we apply different logic than codel proper. Instead of
+  // adapting the interval until the next drop, we slough off requests with
+  // queueing delay > 2*target_delay while in the overloaded regime. This
+  // empirically works better for our services than the codel approach of
+  // increasingly often dropping packets.
+  if (overloaded_ && delay > getSloughTimeout()) {
+    ret = true;
+  }
+
+  return ret;
+}
+
+int Codel::getLoad() {
+  // it might be better to use the average delay instead of minDelay, but we'd
+  // have to track it. aspiring bootcamper?
+  return std::min<int>(100, 100 * getMinDelay() / getSloughTimeout());
+}
+
+nanoseconds Codel::getMinDelay() {
+  return codelMinDelay_;
+}
+
+milliseconds Codel::getInterval() {
+  return milliseconds(FLAGS_codel_interval);
+}
+
+milliseconds Codel::getTargetDelay() {
+  return milliseconds(FLAGS_codel_target_delay);
+}
+
+milliseconds Codel::getSloughTimeout() {
+  return getTargetDelay() * 2;
+}
+
+} // namespace folly
diff --git a/folly/executors/Codel.h b/folly/executors/Codel.h
new file mode 100644 (file)
index 0000000..98718bb
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+
+#include <folly/portability/GFlags.h>
+
+DECLARE_int32(codel_interval);
+DECLARE_int32(codel_target_delay);
+
+namespace folly {
+
+/// CoDel (controlled delay) is an active queue management algorithm from
+/// networking for battling bufferbloat.
+///
+/// Services also have queues (of requests, not packets) and suffer from
+/// queueing delay when overloaded. This class adapts the codel algorithm for
+/// services.
+///
+/// Codel is discussed in depth on the web [1,2], but a basic sketch of the
+/// algorithm is this: if every request has experienced queueing delay greater
+/// than the target (5ms) during the past interval (100ms), then we shed load.
+///
+/// We have adapted the codel algorithm. TCP sheds load by changing windows in
+/// reaction to dropped packets. Codel in a network setting drops packets at
+/// increasingly shorter intervals (100 / sqrt(n)) to achieve a linear change
+/// in throughput. In our experience a different scheme works better for
+/// services: when overloaded slough off requests that we dequeue which have
+/// exceeded an alternate timeout (2 * target_delay).
+///
+/// So in summary, to use this class, calculate the time each request spent in
+/// the queue and feed that delay to overloaded(), which will tell you whether
+/// to expire this request.
+///
+/// You can also ask for an instantaneous load estimate and the minimum delay
+/// observed during this interval.
+///
+///
+/// 1. http://queue.acm.org/detail.cfm?id=2209336
+/// 2. https://en.wikipedia.org/wiki/CoDel
+class Codel {
+ public:
+  Codel();
+
+  /// Returns true if this request should be expired to reduce overload.
+  /// In detail, this returns true if min_delay > target_delay for the
+  /// interval, and this delay > 2 * target_delay.
+  ///
+  /// As you may guess, we observe the clock so this is time sensitive. Call
+  /// it promptly after calculating queueing delay.
+  bool overloaded(std::chrono::nanoseconds delay);
+
+  /// Get the queue load, as seen by the codel algorithm
+  /// Gives a rough guess at how bad the queue delay is.
+  ///
+  ///   min(100%, min_delay / (2 * target_delay))
+  ///
+  /// Return:  0 = no delay, 100 = At the queueing limit
+  int getLoad();
+
+  std::chrono::nanoseconds getMinDelay();
+  std::chrono::milliseconds getInterval();
+  std::chrono::milliseconds getTargetDelay();
+  std::chrono::milliseconds getSloughTimeout();
+
+ private:
+  std::chrono::nanoseconds codelMinDelay_;
+  std::chrono::time_point<std::chrono::steady_clock> codelIntervalTime_;
+
+  // flag to make overloaded() thread-safe, since we only want
+  // to reset the delay once per time period
+  std::atomic<bool> codelResetDelay_;
+
+  bool overloaded_;
+};
+
+} // namespace folly
diff --git a/folly/executors/FiberIOExecutor.h b/folly/executors/FiberIOExecutor.h
new file mode 100644 (file)
index 0000000..18c3618
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/IOExecutor.h>
+#include <folly/fibers/FiberManagerMap.h>
+
+namespace folly {
+
+/**
+ * @class FiberIOExecutor
+ * @brief An IOExecutor that executes funcs under mapped fiber context
+ *
+ * A FiberIOExecutor wraps an IOExecutor, but executes funcs on the FiberManager
+ * mapped to the underlying IOExector's event base.
+ */
+class FiberIOExecutor : public IOExecutor {
+ public:
+  explicit FiberIOExecutor(const std::shared_ptr<IOExecutor>& ioExecutor)
+      : ioExecutor_(ioExecutor) {}
+
+  virtual void add(folly::Function<void()> f) override {
+    auto eventBase = ioExecutor_->getEventBase();
+    folly::fibers::getFiberManager(*eventBase).add(std::move(f));
+  }
+
+  virtual folly::EventBase* getEventBase() override {
+    return ioExecutor_->getEventBase();
+  }
+
+ private:
+  std::shared_ptr<IOExecutor> ioExecutor_;
+};
+
+} // namespace folly
diff --git a/folly/executors/FutureExecutor.h b/folly/executors/FutureExecutor.h
new file mode 100644 (file)
index 0000000..705a82f
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/futures/Future.h>
+
+namespace folly {
+
+template <typename ExecutorImpl>
+class FutureExecutor : public ExecutorImpl {
+ public:
+  template <typename... Args>
+  explicit FutureExecutor(Args&&... args)
+      : ExecutorImpl(std::forward<Args>(args)...) {}
+
+  /*
+   * Given a function func that returns a Future<T>, adds that function to the
+   * contained Executor and returns a Future<T> which will be fulfilled with
+   * func's result once it has been executed.
+   *
+   * For example: auto f = futureExecutor.addFuture([](){
+   *                return doAsyncWorkAndReturnAFuture();
+   *              });
+   */
+  template <typename F>
+  typename std::enable_if<
+      folly::isFuture<typename std::result_of<F()>::type>::value,
+      typename std::result_of<F()>::type>::type
+  addFuture(F func) {
+    typedef typename std::result_of<F()>::type::value_type T;
+    folly::Promise<T> promise;
+    auto future = promise.getFuture();
+    ExecutorImpl::add(
+        [ promise = std::move(promise), func = std::move(func) ]() mutable {
+          func().then([promise = std::move(promise)](
+              folly::Try<T> && t) mutable { promise.setTry(std::move(t)); });
+        });
+    return future;
+  }
+
+  /*
+   * Similar to addFuture above, but takes a func that returns some non-Future
+   * type T.
+   *
+   * For example: auto f = futureExecutor.addFuture([]() {
+   *                return 42;
+   *              });
+   */
+  template <typename F>
+  typename std::enable_if<
+      !folly::isFuture<typename std::result_of<F()>::type>::value,
+      folly::Future<typename folly::Unit::Lift<
+          typename std::result_of<F()>::type>::type>>::type
+  addFuture(F func) {
+    using T =
+        typename folly::Unit::Lift<typename std::result_of<F()>::type>::type;
+    folly::Promise<T> promise;
+    auto future = promise.getFuture();
+    ExecutorImpl::add(
+        [ promise = std::move(promise), func = std::move(func) ]() mutable {
+          promise.setWith(std::move(func));
+        });
+    return future;
+  }
+};
+
+} // namespace folly
diff --git a/folly/executors/GlobalExecutor.cpp b/folly/executors/GlobalExecutor.cpp
new file mode 100644 (file)
index 0000000..d2de51d
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Singleton.h>
+#include <folly/executors/IOExecutor.h>
+#include <folly/executors/IOThreadPoolExecutor.h>
+#include <folly/futures/InlineExecutor.h>
+
+using namespace folly;
+
+namespace {
+
+// lock protecting global CPU executor
+struct CPUExecutorLock {};
+Singleton<RWSpinLock, CPUExecutorLock> globalCPUExecutorLock;
+// global CPU executor
+Singleton<std::weak_ptr<Executor>> globalCPUExecutor;
+// default global CPU executor is an InlineExecutor
+Singleton<std::shared_ptr<InlineExecutor>> globalInlineExecutor([] {
+  return new std::shared_ptr<InlineExecutor>(
+      std::make_shared<InlineExecutor>());
+});
+
+// lock protecting global IO executor
+struct IOExecutorLock {};
+Singleton<RWSpinLock, IOExecutorLock> globalIOExecutorLock;
+// global IO executor
+Singleton<std::weak_ptr<IOExecutor>> globalIOExecutor;
+// default global IO executor is an IOThreadPoolExecutor
+Singleton<std::shared_ptr<IOThreadPoolExecutor>> globalIOThreadPool([] {
+  return new std::shared_ptr<IOThreadPoolExecutor>(
+      std::make_shared<IOThreadPoolExecutor>(
+          sysconf(_SC_NPROCESSORS_ONLN),
+          std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
+});
+}
+
+namespace folly {
+
+template <class Exe, class DefaultExe, class LockTag>
+std::shared_ptr<Exe> getExecutor(
+    Singleton<std::weak_ptr<Exe>>& sExecutor,
+    Singleton<std::shared_ptr<DefaultExe>>& sDefaultExecutor,
+    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+  std::shared_ptr<Exe> executor;
+  auto singleton = sExecutor.try_get();
+  auto lock = sExecutorLock.try_get();
+
+  {
+    RWSpinLock::ReadHolder guard(lock.get());
+    if ((executor = sExecutor.try_get()->lock())) {
+      return executor;
+    }
+  }
+
+  RWSpinLock::WriteHolder guard(lock.get());
+  executor = singleton->lock();
+  if (!executor) {
+    std::weak_ptr<Exe> defaultExecutor = *sDefaultExecutor.try_get().get();
+    executor = defaultExecutor.lock();
+    sExecutor.try_get().get()->swap(defaultExecutor);
+  }
+  return executor;
+}
+
+template <class Exe, class LockTag>
+void setExecutor(
+    std::weak_ptr<Exe> executor,
+    Singleton<std::weak_ptr<Exe>>& sExecutor,
+    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+  auto lock = sExecutorLock.try_get();
+  RWSpinLock::WriteHolder guard(*lock);
+  std::weak_ptr<Exe> executor_weak = std::move(executor);
+  sExecutor.try_get().get()->swap(executor_weak);
+}
+
+std::shared_ptr<Executor> getCPUExecutor() {
+  return getExecutor(
+      globalCPUExecutor, globalInlineExecutor, globalCPUExecutorLock);
+}
+
+void setCPUExecutor(std::weak_ptr<Executor> executor) {
+  setExecutor(std::move(executor), globalCPUExecutor, globalCPUExecutorLock);
+}
+
+std::shared_ptr<IOExecutor> getIOExecutor() {
+  return getExecutor(
+      globalIOExecutor, globalIOThreadPool, globalIOExecutorLock);
+}
+
+EventBase* getEventBase() {
+  return getIOExecutor()->getEventBase();
+}
+
+void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
+  setExecutor(std::move(executor), globalIOExecutor, globalIOExecutorLock);
+}
+
+} // namespace folly
diff --git a/folly/executors/GlobalExecutor.h b/folly/executors/GlobalExecutor.h
new file mode 100644 (file)
index 0000000..79f5dce
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include <folly/Executor.h>
+#include <folly/executors/IOExecutor.h>
+
+namespace folly {
+
+// Retrieve the global Executor. If there is none, a default InlineExecutor
+// will be constructed and returned. This is named CPUExecutor to distinguish
+// it from IOExecutor below and to hint that it's intended for CPU-bound tasks.
+std::shared_ptr<folly::Executor> getCPUExecutor();
+
+// Set an Executor to be the global Executor which will be returned by
+// subsequent calls to getCPUExecutor().
+void setCPUExecutor(std::weak_ptr<folly::Executor> executor);
+
+// Retrieve the global IOExecutor. If there is none, a default
+// IOThreadPoolExecutor will be constructed and returned.
+//
+// IOExecutors differ from Executors in that they drive and provide access to
+// one or more EventBases.
+std::shared_ptr<IOExecutor> getIOExecutor();
+
+// Retrieve an event base from the global IOExecutor
+folly::EventBase* getEventBase();
+
+// Set an IOExecutor to be the global IOExecutor which will be returned by
+// subsequent calls to getIOExecutor().
+void setIOExecutor(std::weak_ptr<IOExecutor> executor);
+
+} // namespace folly
diff --git a/folly/executors/IOExecutor.h b/folly/executors/IOExecutor.h
new file mode 100644 (file)
index 0000000..223c049
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Executor.h>
+
+namespace folly {
+class EventBase;
+}
+
+namespace folly {
+
+// An IOExecutor is an executor that operates on at least one EventBase.  One of
+// these EventBases should be accessible via getEventBase(). The event base
+// returned by a call to getEventBase() is implementation dependent.
+//
+// Note that IOExecutors don't necessarily loop on the base themselves - for
+// instance, EventBase itself is an IOExecutor but doesn't drive itself.
+//
+// Implementations of IOExecutor are eligible to become the global IO executor,
+// returned on every call to getIOExecutor(), via setIOExecutor().
+// These functions are declared in GlobalExecutor.h
+//
+// If getIOExecutor is called and none has been set, a default global
+// IOThreadPoolExecutor will be created and returned.
+class IOExecutor : public virtual folly::Executor {
+ public:
+  ~IOExecutor() override = default;
+  virtual folly::EventBase* getEventBase() = 0;
+};
+
+} // namespace folly
diff --git a/folly/executors/IOObjectCache.h b/folly/executors/IOObjectCache.h
new file mode 100644 (file)
index 0000000..a271d6e
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ThreadLocal.h>
+#include <folly/executors/GlobalExecutor.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+/*
+ * IOObjectCache manages objects of type T that are dependent on an EventBase
+ * provided by the global IOExecutor.
+ *
+ * Provide a factory that creates T objects given an EventBase, and get() will
+ * lazily create T objects based on an EventBase from the global IOExecutor.
+ * These are stored thread locally - for a given pair of event base and calling
+ * thread there will only be one T object created.
+ *
+ * The primary use case is for managing objects that need to do async IO on an
+ * event base (e.g. thrift clients) that can be used outside the IO thread
+ * without much hassle. For instance, you could use this to manage Thrift
+ * clients that are only ever called from within other threads without the
+ * calling thread needing to know anything about the IO threads that the clients
+ * will do their work on.
+ */
+template <class T>
+class IOObjectCache {
+ public:
+  typedef std::function<std::shared_ptr<T>(folly::EventBase*)> TFactory;
+
+  IOObjectCache() = default;
+  explicit IOObjectCache(TFactory factory) : factory_(std::move(factory)) {}
+
+  std::shared_ptr<T> get() {
+    CHECK(factory_);
+    auto eb = getIOExecutor()->getEventBase();
+    CHECK(eb);
+    auto it = cache_->find(eb);
+    if (it == cache_->end()) {
+      auto p = cache_->insert(std::make_pair(eb, factory_(eb)));
+      it = p.first;
+    }
+    return it->second;
+  };
+
+  void setFactory(TFactory factory) {
+    factory_ = std::move(factory);
+  }
+
+ private:
+  folly::ThreadLocal<std::map<folly::EventBase*, std::shared_ptr<T>>> cache_;
+  TFactory factory_;
+};
+
+} // namespace folly
diff --git a/folly/executors/IOThreadPoolExecutor.cpp b/folly/executors/IOThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..1cf9bc5
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/IOThreadPoolExecutor.h>
+
+#include <glog/logging.h>
+
+#include <folly/detail/MemoryIdler.h>
+
+namespace folly {
+
+using folly::detail::MemoryIdler;
+
+/* Class that will free jemalloc caches and madvise the stack away
+ * if the event loop is unused for some period of time
+ */
+class MemoryIdlerTimeout : public AsyncTimeout, public EventBase::LoopCallback {
+ public:
+  explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
+
+  void timeoutExpired() noexcept override {
+    idled = true;
+  }
+
+  void runLoopCallback() noexcept override {
+    if (idled) {
+      MemoryIdler::flushLocalMallocCaches();
+      MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
+
+      idled = false;
+    } else {
+      std::chrono::steady_clock::duration idleTimeout =
+          MemoryIdler::defaultIdleTimeout.load(std::memory_order_acquire);
+
+      idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
+
+      scheduleTimeout(
+          std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
+              .count());
+    }
+
+    // reschedule this callback for the next event loop.
+    base_->runBeforeLoop(this);
+  }
+
+ private:
+  EventBase* base_;
+  bool idled{false};
+};
+
+IOThreadPoolExecutor::IOThreadPoolExecutor(
+    size_t numThreads,
+    std::shared_ptr<ThreadFactory> threadFactory,
+    EventBaseManager* ebm,
+    bool waitForAll)
+    : ThreadPoolExecutor(numThreads, std::move(threadFactory), waitForAll),
+      nextThread_(0),
+      eventBaseManager_(ebm) {
+  setNumThreads(numThreads);
+}
+
+IOThreadPoolExecutor::~IOThreadPoolExecutor() {
+  stop();
+}
+
+void IOThreadPoolExecutor::add(Func func) {
+  add(std::move(func), std::chrono::milliseconds(0));
+}
+
+void IOThreadPoolExecutor::add(
+    Func func,
+    std::chrono::milliseconds expiration,
+    Func expireCallback) {
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  if (threadList_.get().empty()) {
+    throw std::runtime_error("No threads available");
+  }
+  auto ioThread = pickThread();
+
+  auto task = Task(std::move(func), expiration, std::move(expireCallback));
+  auto wrappedFunc = [ ioThread, task = std::move(task) ]() mutable {
+    runTask(ioThread, std::move(task));
+    ioThread->pendingTasks--;
+  };
+
+  ioThread->pendingTasks++;
+  if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
+    ioThread->pendingTasks--;
+    throw std::runtime_error("Unable to run func in event base thread");
+  }
+}
+
+std::shared_ptr<IOThreadPoolExecutor::IOThread>
+IOThreadPoolExecutor::pickThread() {
+  auto& me = *thisThread_;
+  auto& ths = threadList_.get();
+  // When new task is added to IOThreadPoolExecutor, a thread is chosen for it
+  // to be executed on, thisThread_ is by default chosen, however, if the new
+  // task is added by the clean up operations on thread destruction, thisThread_
+  // is not an available thread anymore, thus, always check whether or not
+  // thisThread_ is an available thread before choosing it.
+  if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
+    return me;
+  }
+  auto n = ths.size();
+  if (n == 0) {
+    return me;
+  }
+  auto thread = ths[nextThread_++ % n];
+  return std::static_pointer_cast<IOThread>(thread);
+}
+
+EventBase* IOThreadPoolExecutor::getEventBase() {
+  return pickThread()->eventBase;
+}
+
+EventBase* IOThreadPoolExecutor::getEventBase(
+    ThreadPoolExecutor::ThreadHandle* h) {
+  auto thread = dynamic_cast<IOThread*>(h);
+
+  if (thread) {
+    return thread->eventBase;
+  }
+
+  return nullptr;
+}
+
+EventBaseManager* IOThreadPoolExecutor::getEventBaseManager() {
+  return eventBaseManager_;
+}
+
+std::shared_ptr<ThreadPoolExecutor::Thread> IOThreadPoolExecutor::makeThread() {
+  return std::make_shared<IOThread>(this);
+}
+
+void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
+  this->threadPoolHook_.registerThread();
+
+  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
+  ioThread->eventBase = eventBaseManager_->getEventBase();
+  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
+
+  auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
+  ioThread->eventBase->runBeforeLoop(idler.get());
+
+  ioThread->eventBase->runInEventBaseThread(
+      [thread] { thread->startupBaton.post(); });
+  while (ioThread->shouldRun) {
+    ioThread->eventBase->loopForever();
+  }
+  if (isJoin_) {
+    while (ioThread->pendingTasks > 0) {
+      ioThread->eventBase->loopOnce();
+    }
+  }
+  idler.reset();
+  if (isWaitForAll_) {
+    // some tasks, like thrift asynchronous calls, create additional
+    // event base hookups, let's wait till all of them complete.
+    ioThread->eventBase->loop();
+  }
+
+  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
+  ioThread->eventBase = nullptr;
+  eventBaseManager_->clearEventBase();
+}
+
+// threadListLock_ is writelocked
+void IOThreadPoolExecutor::stopThreads(size_t n) {
+  std::vector<ThreadPtr> stoppedThreads;
+  stoppedThreads.reserve(n);
+  for (size_t i = 0; i < n; i++) {
+    const auto ioThread =
+        std::static_pointer_cast<IOThread>(threadList_.get()[i]);
+    for (auto& o : observers_) {
+      o->threadStopped(ioThread.get());
+    }
+    ioThread->shouldRun = false;
+    stoppedThreads.push_back(ioThread);
+    std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
+    if (ioThread->eventBase) {
+      ioThread->eventBase->terminateLoopSoon();
+    }
+  }
+  for (auto thread : stoppedThreads) {
+    stoppedThreads_.add(thread);
+    threadList_.remove(thread);
+  }
+}
+
+// threadListLock_ is readlocked
+uint64_t IOThreadPoolExecutor::getPendingTaskCountImpl(
+    const folly::RWSpinLock::ReadHolder&) {
+  uint64_t count = 0;
+  for (const auto& thread : threadList_.get()) {
+    auto ioThread = std::static_pointer_cast<IOThread>(thread);
+    size_t pendingTasks = ioThread->pendingTasks;
+    if (pendingTasks > 0 && !ioThread->idle) {
+      pendingTasks--;
+    }
+    count += pendingTasks;
+  }
+  return count;
+}
+
+} // namespace folly
diff --git a/folly/executors/IOThreadPoolExecutor.h b/folly/executors/IOThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..cf52822
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/IOExecutor.h>
+#include <folly/executors/ThreadPoolExecutor.h>
+#include <folly/io/async/EventBaseManager.h>
+
+namespace folly {
+
+/**
+ * A Thread Pool for IO bound tasks
+ *
+ * @note Uses event_fd for notification, and waking an epoll loop.
+ * There is one queue (NotificationQueue specifically) per thread/epoll.
+ * If the thread is already running and not waiting on epoll,
+ * we don't make any additional syscalls to wake up the loop,
+ * just put the new task in the queue.
+ * If any thread has been waiting for more than a few seconds,
+ * its stack is madvised away. Currently however tasks are scheduled round
+ * robin on the queues, so unless there is no work going on,
+ * this isn't very effective.
+ * Since there is one queue per thread, there is hardly any contention
+ * on the queues - so a simple spinlock around an std::deque is used for
+ * the tasks. There is no max queue size.
+ * By default, there is one thread per core - it usually doesn't make sense to
+ * have more IO threads than this, assuming they don't block.
+ *
+ * @note ::getEventBase() will return an EventBase you can schedule IO work on
+ * directly, chosen round-robin.
+ *
+ * @note N.B. For this thread pool, stop() behaves like join() because
+ * outstanding tasks belong to the event base and will be executed upon its
+ * destruction.
+ */
+class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
+ public:
+  explicit IOThreadPoolExecutor(
+      size_t numThreads,
+      std::shared_ptr<ThreadFactory> threadFactory =
+          std::make_shared<NamedThreadFactory>("IOThreadPool"),
+      folly::EventBaseManager* ebm = folly::EventBaseManager::get(),
+      bool waitForAll = false);
+
+  ~IOThreadPoolExecutor() override;
+
+  void add(Func func) override;
+  void add(
+      Func func,
+      std::chrono::milliseconds expiration,
+      Func expireCallback = nullptr) override;
+
+  folly::EventBase* getEventBase() override;
+
+  static folly::EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*);
+
+  folly::EventBaseManager* getEventBaseManager();
+
+ private:
+  struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
+    IOThread(IOThreadPoolExecutor* pool)
+        : Thread(pool), shouldRun(true), pendingTasks(0) {}
+    std::atomic<bool> shouldRun;
+    std::atomic<size_t> pendingTasks;
+    folly::EventBase* eventBase;
+    std::mutex eventBaseShutdownMutex_;
+  };
+
+  ThreadPtr makeThread() override;
+  std::shared_ptr<IOThread> pickThread();
+  void threadRun(ThreadPtr thread) override;
+  void stopThreads(size_t n) override;
+  uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
+
+  size_t nextThread_;
+  folly::ThreadLocal<std::shared_ptr<IOThread>> thisThread_;
+  folly::EventBaseManager* eventBaseManager_;
+};
+
+} // namespace folly
diff --git a/folly/executors/LifoSemMPMCQueue.h b/folly/executors/LifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..3a16da2
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class LifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  // Note: The queue pre-allocates all memory for max_capacity
+  explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {}
+
+  void add(T item) override {
+    switch (kBehavior) { // static
+      case QueueBehaviorIfFull::THROW:
+        if (!queue_.write(std::move(item))) {
+          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+        }
+        break;
+      case QueueBehaviorIfFull::BLOCK:
+        queue_.blockingWrite(std::move(item));
+        break;
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (!queue_.readIfNotEmpty(item)) {
+      sem_.wait();
+    }
+    return item;
+  }
+
+  size_t capacity() {
+    return queue_.capacity();
+  }
+
+  size_t size() override {
+    return queue_.size();
+  }
+
+ private:
+  folly::LifoSem sem_;
+  folly::MPMCQueue<T> queue_;
+};
+
+} // namespace folly
diff --git a/folly/executors/NamedThreadFactory.h b/folly/executors/NamedThreadFactory.h
new file mode 100644 (file)
index 0000000..bcd4a6b
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <string>
+#include <thread>
+
+#include <folly/Conv.h>
+#include <folly/Range.h>
+#include <folly/ThreadName.h>
+#include <folly/executors/ThreadFactory.h>
+
+namespace folly {
+
+class NamedThreadFactory : public ThreadFactory {
+ public:
+  explicit NamedThreadFactory(folly::StringPiece prefix)
+      : prefix_(prefix.str()), suffix_(0) {}
+
+  std::thread newThread(Func&& func) override {
+    auto thread = std::thread(std::move(func));
+    folly::setThreadName(
+        thread.native_handle(), folly::to<std::string>(prefix_, suffix_++));
+    return thread;
+  }
+
+  void setNamePrefix(folly::StringPiece prefix) {
+    prefix_ = prefix.str();
+  }
+
+  std::string getNamePrefix() {
+    return prefix_;
+  }
+
+ private:
+  std::string prefix_;
+  std::atomic<uint64_t> suffix_;
+};
+
+} // namespace folly
diff --git a/folly/executors/PriorityLifoSemMPMCQueue.h b/folly/executors/PriorityLifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..797287c
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Executor.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  // Note A: The queue pre-allocates all memory for max_capacity
+  // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
+  //         MID_PRI and HI_PRI are treated at the same priority level.
+  PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
+    queues_.reserve(numPriorities);
+    for (int8_t i = 0; i < numPriorities; i++) {
+      queues_.emplace_back(max_capacity);
+    }
+  }
+
+  uint8_t getNumPriorities() override {
+    return queues_.size();
+  }
+
+  // Add at medium priority by default
+  void add(T item) override {
+    addWithPriority(std::move(item), folly::Executor::MID_PRI);
+  }
+
+  void addWithPriority(T item, int8_t priority) override {
+    int mid = getNumPriorities() / 2;
+    size_t queue = priority < 0
+        ? std::max(0, mid + priority)
+        : std::min(getNumPriorities() - 1, mid + priority);
+    CHECK_LT(queue, queues_.size());
+    switch (kBehavior) { // static
+      case QueueBehaviorIfFull::THROW:
+        if (!queues_[queue].write(std::move(item))) {
+          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+        }
+        break;
+      case QueueBehaviorIfFull::BLOCK:
+        queues_[queue].blockingWrite(std::move(item));
+        break;
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (true) {
+      if (nonBlockingTake(item)) {
+        return item;
+      }
+      sem_.wait();
+    }
+  }
+
+  bool nonBlockingTake(T& item) {
+    for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
+      if (it->readIfNotEmpty(item)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  size_t size() override {
+    size_t size = 0;
+    for (auto& q : queues_) {
+      size += q.size();
+    }
+    return size;
+  }
+
+  size_t sizeGuess() const {
+    size_t size = 0;
+    for (auto& q : queues_) {
+      size += q.sizeGuess();
+    }
+    return size;
+  }
+
+ private:
+  folly::LifoSem sem_;
+  std::vector<folly::MPMCQueue<T>> queues_;
+};
+
+} // namespace folly
diff --git a/folly/executors/PriorityThreadFactory.h b/folly/executors/PriorityThreadFactory.h
new file mode 100644 (file)
index 0000000..ed46dd3
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/ThreadFactory.h>
+
+#include <folly/portability/SysResource.h>
+#include <folly/portability/SysTime.h>
+
+namespace folly {
+
+/**
+ * A ThreadFactory that sets nice values for each thread.  The main
+ * use case for this class is if there are multiple
+ * CPUThreadPoolExecutors in a single process, or between multiple
+ * processes, where some should have a higher priority than the others.
+ *
+ * Note that per-thread nice values are not POSIX standard, but both
+ * pthreads and linux support per-thread nice.  The default linux
+ * scheduler uses these values to do smart thread prioritization.
+ * sched_priority function calls only affect real-time schedulers.
+ */
+class PriorityThreadFactory : public ThreadFactory {
+ public:
+  explicit PriorityThreadFactory(
+      std::shared_ptr<ThreadFactory> factory,
+      int priority)
+      : factory_(std::move(factory)), priority_(priority) {}
+
+  std::thread newThread(Func&& func) override {
+    int priority = priority_;
+    return factory_->newThread([ priority, func = std::move(func) ]() mutable {
+      if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
+        LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
+            strerror(errno);
+      }
+      func();
+    });
+  }
+
+ private:
+  std::shared_ptr<ThreadFactory> factory_;
+  int priority_;
+};
+
+} // folly
diff --git a/folly/executors/SerialExecutor.cpp b/folly/executors/SerialExecutor.cpp
new file mode 100644 (file)
index 0000000..9380d68
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerialExecutor.h"
+
+#include <mutex>
+#include <queue>
+
+#include <glog/logging.h>
+
+#include <folly/ExceptionString.h>
+
+namespace folly {
+
+class SerialExecutor::TaskQueueImpl {
+ public:
+  void add(Func&& func) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    queue_.push(std::move(func));
+  }
+
+  void run() {
+    std::unique_lock<std::mutex> lock(mutex_);
+
+    ++scheduled_;
+
+    if (scheduled_ > 1) {
+      return;
+    }
+
+    do {
+      DCHECK(!queue_.empty());
+      Func func = std::move(queue_.front());
+      queue_.pop();
+      lock.unlock();
+
+      try {
+        func();
+      } catch (std::exception const& ex) {
+        LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
+                   << folly::exceptionStr(ex);
+      } catch (...) {
+        LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
+                      "object";
+      }
+
+      // Destroy the function (and the data it captures) before we acquire the
+      // lock again.
+      func = {};
+
+      lock.lock();
+      --scheduled_;
+    } while (scheduled_);
+  }
+
+ private:
+  std::mutex mutex_;
+  std::size_t scheduled_{0};
+  std::queue<Func> queue_;
+};
+
+SerialExecutor::SerialExecutor(std::shared_ptr<folly::Executor> parent)
+    : parent_(std::move(parent)),
+      taskQueueImpl_(std::make_shared<TaskQueueImpl>()) {}
+
+void SerialExecutor::add(Func func) {
+  taskQueueImpl_->add(std::move(func));
+  parent_->add([impl = taskQueueImpl_] { impl->run(); });
+}
+
+void SerialExecutor::addWithPriority(Func func, int8_t priority) {
+  taskQueueImpl_->add(std::move(func));
+  parent_->addWithPriority([impl = taskQueueImpl_] { impl->run(); }, priority);
+}
+
+} // namespace folly
diff --git a/folly/executors/SerialExecutor.h b/folly/executors/SerialExecutor.h
new file mode 100644 (file)
index 0000000..c6c1cae
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include <folly/Executor.h>
+#include <folly/executors/GlobalExecutor.h>
+
+namespace folly {
+
+/**
+ * @class SerialExecutor
+ *
+ * @brief Executor that guarantees serial non-concurrent execution of added
+ *     tasks
+ *
+ * SerialExecutor is similar to boost asio's strand concept. A SerialExecutor
+ * has a parent executor which is given at construction time (defaults to
+ * folly's global CPUExecutor). Tasks added to SerialExecutor are executed
+ * in the parent executor, however strictly non-concurrently and in the order
+ * they were added.
+ *
+ * SerialExecutor tries to schedule its tasks fairly. Every task submitted to
+ * it results in one task submitted to the parent executor. Whenever the parent
+ * executor executes one of those, one of the tasks submitted to SerialExecutor
+ * is marked for execution, which means it will either be executed at once,
+ * or if a task is currently being executed already, after that.
+ *
+ * The SerialExecutor may be deleted at any time. All tasks that have been
+ * submitted will still be executed with the same guarantees, as long as the
+ * parent executor is executing tasks.
+ */
+
+class SerialExecutor : public folly::Executor {
+ public:
+  ~SerialExecutor() override = default;
+  SerialExecutor(SerialExecutor const&) = delete;
+  SerialExecutor& operator=(SerialExecutor const&) = delete;
+  SerialExecutor(SerialExecutor&&) = default;
+  SerialExecutor& operator=(SerialExecutor&&) = default;
+
+  explicit SerialExecutor(
+      std::shared_ptr<folly::Executor> parent = folly::getCPUExecutor());
+
+  /**
+   * Add one task for execution in the parent executor
+   */
+  void add(Func func) override;
+
+  /**
+   * Add one task for execution in the parent executor, and use the given
+   * priority for one task submission to parent executor.
+   *
+   * Since in-order execution of tasks submitted to SerialExecutor is
+   * guaranteed, the priority given here does not necessarily reflect the
+   * execution priority of the task submitted with this call to
+   * `addWithPriority`. The given priority is passed on to the parent executor
+   * for the execution of one of the SerialExecutor's tasks.
+   */
+  void addWithPriority(Func func, int8_t priority) override;
+  uint8_t getNumPriorities() const override {
+    return parent_->getNumPriorities();
+  }
+
+ private:
+  class TaskQueueImpl;
+
+  std::shared_ptr<folly::Executor> parent_;
+  std::shared_ptr<TaskQueueImpl> taskQueueImpl_;
+};
+
+} // namespace folly
diff --git a/folly/executors/ThreadFactory.h b/folly/executors/ThreadFactory.h
new file mode 100644 (file)
index 0000000..0af8632
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/Executor.h>
+
+#include <thread>
+
+namespace folly {
+
+class ThreadFactory {
+ public:
+  virtual ~ThreadFactory() = default;
+  virtual std::thread newThread(Func&& func) = 0;
+};
+
+} // namespace folly
diff --git a/folly/executors/ThreadPoolExecutor.cpp b/folly/executors/ThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..552d981
--- /dev/null
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/ThreadPoolExecutor.h>
+
+#include <folly/GlobalThreadPoolList.h>
+
+namespace folly {
+
+ThreadPoolExecutor::ThreadPoolExecutor(
+    size_t /* numThreads */,
+    std::shared_ptr<ThreadFactory> threadFactory,
+    bool isWaitForAll)
+    : threadFactory_(std::move(threadFactory)),
+      isWaitForAll_(isWaitForAll),
+      taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
+      threadPoolHook_("Wangle::ThreadPoolExecutor") {}
+
+ThreadPoolExecutor::~ThreadPoolExecutor() {
+  CHECK_EQ(0, threadList_.get().size());
+}
+
+ThreadPoolExecutor::Task::Task(
+    Func&& func,
+    std::chrono::milliseconds expiration,
+    Func&& expireCallback)
+    : func_(std::move(func)),
+      expiration_(expiration),
+      expireCallback_(std::move(expireCallback)),
+      context_(folly::RequestContext::saveContext()) {
+  // Assume that the task in enqueued on creation
+  enqueueTime_ = std::chrono::steady_clock::now();
+}
+
+void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
+  thread->idle = false;
+  auto startTime = std::chrono::steady_clock::now();
+  task.stats_.waitTime = startTime - task.enqueueTime_;
+  if (task.expiration_ > std::chrono::milliseconds(0) &&
+      task.stats_.waitTime >= task.expiration_) {
+    task.stats_.expired = true;
+    if (task.expireCallback_ != nullptr) {
+      task.expireCallback_();
+    }
+  } else {
+    folly::RequestContextScopeGuard rctx(task.context_);
+    try {
+      task.func_();
+    } catch (const std::exception& e) {
+      LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled "
+                 << typeid(e).name() << " exception: " << e.what();
+    } catch (...) {
+      LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
+                    "object";
+    }
+    task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
+  }
+  thread->idle = true;
+  thread->lastActiveTime = std::chrono::steady_clock::now();
+  thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
+    *thread->taskStatsCallbacks->inCallback = true;
+    SCOPE_EXIT {
+      *thread->taskStatsCallbacks->inCallback = false;
+    };
+    try {
+      for (auto& callback : callbacks) {
+        callback(task.stats_);
+      }
+    } catch (const std::exception& e) {
+      LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
+                    "unhandled "
+                 << typeid(e).name() << " exception: " << e.what();
+    } catch (...) {
+      LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
+                    "unhandled non-exception object";
+    }
+  });
+}
+
+size_t ThreadPoolExecutor::numThreads() {
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  return threadList_.get().size();
+}
+
+void ThreadPoolExecutor::setNumThreads(size_t n) {
+  size_t numThreadsToJoin = 0;
+  {
+    RWSpinLock::WriteHolder w{&threadListLock_};
+    const auto current = threadList_.get().size();
+    if (n > current) {
+      addThreads(n - current);
+    } else if (n < current) {
+      numThreadsToJoin = current - n;
+      removeThreads(numThreadsToJoin, true);
+    }
+  }
+  joinStoppedThreads(numThreadsToJoin);
+  CHECK_EQ(n, threadList_.get().size());
+  CHECK_EQ(0, stoppedThreads_.size());
+}
+
+// threadListLock_ is writelocked
+void ThreadPoolExecutor::addThreads(size_t n) {
+  std::vector<ThreadPtr> newThreads;
+  for (size_t i = 0; i < n; i++) {
+    newThreads.push_back(makeThread());
+  }
+  for (auto& thread : newThreads) {
+    // TODO need a notion of failing to create the thread
+    // and then handling for that case
+    thread->handle = threadFactory_->newThread(
+        std::bind(&ThreadPoolExecutor::threadRun, this, thread));
+    threadList_.add(thread);
+  }
+  for (auto& thread : newThreads) {
+    thread->startupBaton.wait();
+  }
+  for (auto& o : observers_) {
+    for (auto& thread : newThreads) {
+      o->threadStarted(thread.get());
+    }
+  }
+}
+
+// threadListLock_ is writelocked
+void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
+  CHECK_LE(n, threadList_.get().size());
+  isJoin_ = isJoin;
+  stopThreads(n);
+}
+
+void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
+  for (size_t i = 0; i < n; i++) {
+    auto thread = stoppedThreads_.take();
+    thread->handle.join();
+  }
+}
+
+void ThreadPoolExecutor::stop() {
+  size_t n = 0;
+  {
+    RWSpinLock::WriteHolder w{&threadListLock_};
+    n = threadList_.get().size();
+    removeThreads(n, false);
+  }
+  joinStoppedThreads(n);
+  CHECK_EQ(0, threadList_.get().size());
+  CHECK_EQ(0, stoppedThreads_.size());
+}
+
+void ThreadPoolExecutor::join() {
+  size_t n = 0;
+  {
+    RWSpinLock::WriteHolder w{&threadListLock_};
+    n = threadList_.get().size();
+    removeThreads(n, true);
+  }
+  joinStoppedThreads(n);
+  CHECK_EQ(0, threadList_.get().size());
+  CHECK_EQ(0, stoppedThreads_.size());
+}
+
+ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
+  const auto now = std::chrono::steady_clock::now();
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  ThreadPoolExecutor::PoolStats stats;
+  stats.threadCount = threadList_.get().size();
+  for (auto thread : threadList_.get()) {
+    if (thread->idle) {
+      stats.idleThreadCount++;
+      const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
+      stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
+    } else {
+      stats.activeThreadCount++;
+    }
+  }
+  stats.pendingTaskCount = getPendingTaskCountImpl(r);
+  stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount;
+  return stats;
+}
+
+uint64_t ThreadPoolExecutor::getPendingTaskCount() {
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  return getPendingTaskCountImpl(r);
+}
+
+std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
+
+void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) {
+  if (*taskStatsCallbacks_->inCallback) {
+    throw std::runtime_error("cannot subscribe in task stats callback");
+  }
+  taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
+}
+
+void ThreadPoolExecutor::StoppedThreadQueue::add(
+    ThreadPoolExecutor::ThreadPtr item) {
+  std::lock_guard<std::mutex> guard(mutex_);
+  queue_.push(std::move(item));
+  sem_.post();
+}
+
+ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
+  while (1) {
+    {
+      std::lock_guard<std::mutex> guard(mutex_);
+      if (queue_.size() > 0) {
+        auto item = std::move(queue_.front());
+        queue_.pop();
+        return item;
+      }
+    }
+    sem_.wait();
+  }
+}
+
+size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
+  std::lock_guard<std::mutex> guard(mutex_);
+  return queue_.size();
+}
+
+void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  observers_.push_back(o);
+  for (auto& thread : threadList_.get()) {
+    o->threadPreviouslyStarted(thread.get());
+  }
+}
+
+void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
+  RWSpinLock::ReadHolder r{&threadListLock_};
+  for (auto& thread : threadList_.get()) {
+    o->threadNotYetStopped(thread.get());
+  }
+
+  for (auto it = observers_.begin(); it != observers_.end(); it++) {
+    if (*it == o) {
+      observers_.erase(it);
+      return;
+    }
+  }
+  DCHECK(false);
+}
+
+} // namespace folly
diff --git a/folly/executors/ThreadPoolExecutor.h b/folly/executors/ThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..1e270b4
--- /dev/null
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/Baton.h>
+#include <folly/Executor.h>
+#include <folly/GlobalThreadPoolList.h>
+#include <folly/Memory.h>
+#include <folly/RWSpinLock.h>
+#include <folly/executors/LifoSemMPMCQueue.h>
+#include <folly/executors/NamedThreadFactory.h>
+#include <folly/io/async/Request.h>
+
+#include <algorithm>
+#include <mutex>
+#include <queue>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+class ThreadPoolExecutor : public virtual folly::Executor {
+ public:
+  explicit ThreadPoolExecutor(
+      size_t numThreads,
+      std::shared_ptr<ThreadFactory> threadFactory,
+      bool isWaitForAll = false);
+
+  ~ThreadPoolExecutor() override;
+
+  void add(Func func) override = 0;
+  virtual void
+  add(Func func, std::chrono::milliseconds expiration, Func expireCallback) = 0;
+
+  void setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory) {
+    CHECK(numThreads() == 0);
+    threadFactory_ = std::move(threadFactory);
+  }
+
+  std::shared_ptr<ThreadFactory> getThreadFactory(void) {
+    return threadFactory_;
+  }
+
+  size_t numThreads();
+  void setNumThreads(size_t numThreads);
+  /*
+   * stop() is best effort - there is no guarantee that unexecuted tasks won't
+   * be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
+   * behaves like join().
+   */
+  void stop();
+  void join();
+
+  struct PoolStats {
+    PoolStats()
+        : threadCount(0),
+          idleThreadCount(0),
+          activeThreadCount(0),
+          pendingTaskCount(0),
+          totalTaskCount(0),
+          maxIdleTime(0) {}
+    size_t threadCount, idleThreadCount, activeThreadCount;
+    uint64_t pendingTaskCount, totalTaskCount;
+    std::chrono::nanoseconds maxIdleTime;
+  };
+
+  PoolStats getPoolStats();
+  uint64_t getPendingTaskCount();
+
+  struct TaskStats {
+    TaskStats() : expired(false), waitTime(0), runTime(0) {}
+    bool expired;
+    std::chrono::nanoseconds waitTime;
+    std::chrono::nanoseconds runTime;
+  };
+
+  using TaskStatsCallback = std::function<void(TaskStats)>;
+  void subscribeToTaskStats(TaskStatsCallback cb);
+
+  /**
+   * Base class for threads created with ThreadPoolExecutor.
+   * Some subclasses have methods that operate on these
+   * handles.
+   */
+  class ThreadHandle {
+   public:
+    virtual ~ThreadHandle() = default;
+  };
+
+  /**
+   * Observer interface for thread start/stop.
+   * Provides hooks so actions can be taken when
+   * threads are created
+   */
+  class Observer {
+   public:
+    virtual void threadStarted(ThreadHandle*) = 0;
+    virtual void threadStopped(ThreadHandle*) = 0;
+    virtual void threadPreviouslyStarted(ThreadHandle* h) {
+      threadStarted(h);
+    }
+    virtual void threadNotYetStopped(ThreadHandle* h) {
+      threadStopped(h);
+    }
+    virtual ~Observer() = default;
+  };
+
+  void addObserver(std::shared_ptr<Observer>);
+  void removeObserver(std::shared_ptr<Observer>);
+
+ protected:
+  // Prerequisite: threadListLock_ writelocked
+  void addThreads(size_t n);
+  // Prerequisite: threadListLock_ writelocked
+  void removeThreads(size_t n, bool isJoin);
+
+  struct TaskStatsCallbackRegistry;
+
+  struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread : public ThreadHandle {
+    explicit Thread(ThreadPoolExecutor* pool)
+        : id(nextId++),
+          handle(),
+          idle(true),
+          lastActiveTime(std::chrono::steady_clock::now()),
+          taskStatsCallbacks(pool->taskStatsCallbacks_) {}
+
+    ~Thread() override = default;
+
+    static std::atomic<uint64_t> nextId;
+    uint64_t id;
+    std::thread handle;
+    bool idle;
+    std::chrono::steady_clock::time_point lastActiveTime;
+    folly::Baton<> startupBaton;
+    std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks;
+  };
+
+  typedef std::shared_ptr<Thread> ThreadPtr;
+
+  struct Task {
+    explicit Task(
+        Func&& func,
+        std::chrono::milliseconds expiration,
+        Func&& expireCallback);
+    Func func_;
+    TaskStats stats_;
+    std::chrono::steady_clock::time_point enqueueTime_;
+    std::chrono::milliseconds expiration_;
+    Func expireCallback_;
+    std::shared_ptr<folly::RequestContext> context_;
+  };
+
+  static void runTask(const ThreadPtr& thread, Task&& task);
+
+  // The function that will be bound to pool threads. It must call
+  // thread->startupBaton.post() when it's ready to consume work.
+  virtual void threadRun(ThreadPtr thread) = 0;
+
+  // Stop n threads and put their ThreadPtrs in the stoppedThreads_ queue
+  // and remove them from threadList_, either synchronize or asynchronize
+  // Prerequisite: threadListLock_ writelocked
+  virtual void stopThreads(size_t n) = 0;
+
+  // Join n stopped threads and remove them from waitingForJoinThreads_ queue.
+  // Should not hold a lock because joining thread operation may invoke some
+  // cleanup operations on the thread, and those cleanup operations may
+  // require a lock on ThreadPoolExecutor.
+  void joinStoppedThreads(size_t n);
+
+  // Create a suitable Thread struct
+  virtual ThreadPtr makeThread() {
+    return std::make_shared<Thread>(this);
+  }
+
+  // Prerequisite: threadListLock_ readlocked
+  virtual uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) = 0;
+
+  class ThreadList {
+   public:
+    void add(const ThreadPtr& state) {
+      auto it = std::lower_bound(
+          vec_.begin(),
+          vec_.end(),
+          state,
+          // compare method is a static method of class
+          // and therefore cannot be inlined by compiler
+          // as a template predicate of the STL algorithm
+          // but wrapped up with the lambda function (lambda will be inlined)
+          // compiler can inline compare method as well
+          [&](const ThreadPtr& ts1, const ThreadPtr& ts2) -> bool { // inline
+            return compare(ts1, ts2);
+          });
+      vec_.insert(it, state);
+    }
+
+    void remove(const ThreadPtr& state) {
+      auto itPair = std::equal_range(
+          vec_.begin(),
+          vec_.end(),
+          state,
+          // the same as above
+          [&](const ThreadPtr& ts1, const ThreadPtr& ts2) -> bool { // inline
+            return compare(ts1, ts2);
+          });
+      CHECK(itPair.first != vec_.end());
+      CHECK(std::next(itPair.first) == itPair.second);
+      vec_.erase(itPair.first);
+    }
+
+    const std::vector<ThreadPtr>& get() const {
+      return vec_;
+    }
+
+   private:
+    static bool compare(const ThreadPtr& ts1, const ThreadPtr& ts2) {
+      return ts1->id < ts2->id;
+    }
+
+    std::vector<ThreadPtr> vec_;
+  };
+
+  class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
+   public:
+    void add(ThreadPtr item) override;
+    ThreadPtr take() override;
+    size_t size() override;
+
+   private:
+    folly::LifoSem sem_;
+    std::mutex mutex_;
+    std::queue<ThreadPtr> queue_;
+  };
+
+  std::shared_ptr<ThreadFactory> threadFactory_;
+  const bool isWaitForAll_; // whether to wait till event base loop exits
+
+  ThreadList threadList_;
+  folly::RWSpinLock threadListLock_;
+  StoppedThreadQueue stoppedThreads_;
+  std::atomic<bool> isJoin_; // whether the current downsizing is a join
+
+  struct TaskStatsCallbackRegistry {
+    folly::ThreadLocal<bool> inCallback;
+    folly::Synchronized<std::vector<TaskStatsCallback>> callbackList;
+  };
+  std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks_;
+  std::vector<std::shared_ptr<Observer>> observers_;
+  folly::ThreadPoolListHook threadPoolHook_;
+};
+
+} // namespace folly
diff --git a/folly/executors/ThreadedExecutor.cpp b/folly/executors/ThreadedExecutor.cpp
new file mode 100644 (file)
index 0000000..d75e82b
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/ThreadedExecutor.h>
+
+#include <chrono>
+
+#include <glog/logging.h>
+
+#include <folly/ThreadName.h>
+#include <folly/executors/NamedThreadFactory.h>
+
+namespace folly {
+
+template <typename F>
+static auto with_unique_lock(std::mutex& m, F&& f) -> decltype(f()) {
+  std::unique_lock<std::mutex> lock(m);
+  return f();
+}
+
+ThreadedExecutor::ThreadedExecutor(std::shared_ptr<ThreadFactory> threadFactory)
+    : threadFactory_(std::move(threadFactory)) {
+  controlt_ = std::thread([this] { control(); });
+}
+
+ThreadedExecutor::~ThreadedExecutor() {
+  stopping_.store(true, std::memory_order_release);
+  notify();
+  controlt_.join();
+  CHECK(running_.empty());
+  CHECK(finished_.empty());
+}
+
+void ThreadedExecutor::add(Func func) {
+  CHECK(!stopping_.load(std::memory_order_acquire));
+  with_unique_lock(enqueuedm_, [&] { enqueued_.push_back(std::move(func)); });
+  notify();
+}
+
+std::shared_ptr<ThreadFactory> ThreadedExecutor::newDefaultThreadFactory() {
+  return std::make_shared<NamedThreadFactory>("Threaded");
+}
+
+void ThreadedExecutor::notify() {
+  with_unique_lock(controlm_, [&] { controls_ = true; });
+  controlc_.notify_one();
+}
+
+void ThreadedExecutor::control() {
+  folly::setThreadName("ThreadedCtrl");
+  auto looping = true;
+  while (looping) {
+    controlWait();
+    looping = controlPerformAll();
+  }
+}
+
+void ThreadedExecutor::controlWait() {
+  constexpr auto kMaxWait = std::chrono::seconds(10);
+  std::unique_lock<std::mutex> lock(controlm_);
+  controlc_.wait_for(lock, kMaxWait, [&] { return controls_; });
+  controls_ = false;
+}
+
+void ThreadedExecutor::work(Func& func) {
+  func();
+  auto id = std::this_thread::get_id();
+  with_unique_lock(finishedm_, [&] { finished_.push_back(id); });
+  notify();
+}
+
+void ThreadedExecutor::controlJoinFinishedThreads() {
+  std::deque<std::thread::id> finishedt;
+  with_unique_lock(finishedm_, [&] { std::swap(finishedt, finished_); });
+  for (auto id : finishedt) {
+    running_[id].join();
+    running_.erase(id);
+  }
+}
+
+void ThreadedExecutor::controlLaunchEnqueuedTasks() {
+  std::deque<Func> enqueuedt;
+  with_unique_lock(enqueuedm_, [&] { std::swap(enqueuedt, enqueued_); });
+  for (auto& f : enqueuedt) {
+    auto th = threadFactory_->newThread(
+        [ this, f = std::move(f) ]() mutable { work(f); });
+    auto id = th.get_id();
+    running_[id] = std::move(th);
+  }
+}
+
+bool ThreadedExecutor::controlPerformAll() {
+  auto stopping = stopping_.load(std::memory_order_acquire);
+  controlJoinFinishedThreads();
+  controlLaunchEnqueuedTasks();
+  return !stopping || !running_.empty();
+}
+}
diff --git a/folly/executors/ThreadedExecutor.h b/folly/executors/ThreadedExecutor.h
new file mode 100644 (file)
index 0000000..81c5fca
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include <folly/Executor.h>
+#include <folly/executors/ThreadFactory.h>
+
+namespace folly {
+
+/***
+ *  ThreadedExecutor
+ *
+ *  An executor for blocking tasks.
+ *
+ *  This executor runs each task in its own thread. It works well for tasks
+ *  which mostly sleep, but works poorly for tasks which mostly compute.
+ *
+ *  For each task given to the executor with `add`, the executor spawns a new
+ *  thread for that task, runs the task in that thread, and joins the thread
+ *  after the task has completed.
+ *
+ *  Spawning and joining task threads are done in the executor's internal
+ *  control thread. Calls to `add` put the tasks to be run into a queue, where
+ *  the control thread will find them.
+ *
+ *  There is currently no limitation on, or throttling of, concurrency.
+ *
+ *  This executor is not currently optimized for performance. For example, it
+ *  makes no attempt to re-use task threads. Rather, it exists primarily to
+ *  offload sleep-heavy tasks from the CPU executor, where they might otherwise
+ *  be run.
+ */
+class ThreadedExecutor : public virtual folly::Executor {
+ public:
+  explicit ThreadedExecutor(
+      std::shared_ptr<ThreadFactory> threadFactory = newDefaultThreadFactory());
+  ~ThreadedExecutor() override;
+
+  ThreadedExecutor(ThreadedExecutor const&) = delete;
+  ThreadedExecutor(ThreadedExecutor&&) = delete;
+
+  ThreadedExecutor& operator=(ThreadedExecutor const&) = delete;
+  ThreadedExecutor& operator=(ThreadedExecutor&&) = delete;
+
+  void add(Func func) override;
+
+ private:
+  static std::shared_ptr<ThreadFactory> newDefaultThreadFactory();
+
+  void notify();
+  void control();
+  void controlWait();
+  bool controlPerformAll();
+  void controlJoinFinishedThreads();
+  void controlLaunchEnqueuedTasks();
+
+  void work(Func& func);
+
+  std::shared_ptr<ThreadFactory> threadFactory_;
+
+  std::atomic<bool> stopping_{false};
+
+  std::mutex controlm_;
+  std::condition_variable controlc_;
+  bool controls_ = false;
+  std::thread controlt_;
+
+  std::mutex enqueuedm_;
+  std::deque<Func> enqueued_;
+
+  //  Accessed only by the control thread, so no synchronization.
+  std::map<std::thread::id, std::thread> running_;
+
+  std::mutex finishedm_;
+  std::deque<std::thread::id> finished_;
+};
+}
diff --git a/folly/executors/UnboundedBlockingQueue.h b/folly/executors/UnboundedBlockingQueue.h
new file mode 100644 (file)
index 0000000..3fb09b3
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/Synchronized.h>
+#include <folly/executors/BlockingQueue.h>
+#include <queue>
+
+namespace folly {
+
+// Warning: this is effectively just a std::deque wrapped in a single mutex
+// We are aiming to add a more performant concurrent unbounded queue in the
+// future, but this class is available if you must have an unbounded queue
+// and can tolerate any contention.
+template <class T>
+class UnboundedBlockingQueue : public BlockingQueue<T> {
+ public:
+  virtual ~UnboundedBlockingQueue() {}
+
+  void add(T item) override {
+    queue_.wlock()->push(std::move(item));
+    sem_.post();
+  }
+
+  T take() override {
+    while (true) {
+      {
+        auto ulockedQueue = queue_.ulock();
+        if (!ulockedQueue->empty()) {
+          auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
+          T item = std::move(wlockedQueue->front());
+          wlockedQueue->pop();
+          return item;
+        }
+      }
+      sem_.wait();
+    }
+  }
+
+  size_t size() override {
+    return queue_.rlock()->size();
+  }
+
+ private:
+  LifoSem sem_;
+  Synchronized<std::queue<T>> queue_;
+};
+
+} // namespace folly
diff --git a/folly/executors/test/AsyncTest.cpp b/folly/executors/test/AsyncTest.cpp
new file mode 100644 (file)
index 0000000..60f913f
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/Async.h>
+#include <folly/futures/ManualExecutor.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+
+TEST(AsyncFunc, manual_executor) {
+  auto x = std::make_shared<ManualExecutor>();
+  auto oldX = getCPUExecutor();
+  setCPUExecutor(x);
+  auto f = async([] { return 42; });
+  EXPECT_FALSE(f.isReady());
+  x->run();
+  EXPECT_EQ(42, f.value());
+  setCPUExecutor(oldX);
+}
+
+TEST(AsyncFunc, value_lambda) {
+  auto lambda = [] { return 42; };
+  auto future = async(lambda);
+  EXPECT_EQ(42, future.get());
+}
+
+TEST(AsyncFunc, void_lambda) {
+  auto lambda = [] { /*do something*/ return; };
+  auto future = async(lambda);
+  // Futures with a void returning function, return Unit type
+  EXPECT_EQ(typeid(Unit), typeid(future.get()));
+}
+
+TEST(AsyncFunc, moveonly_lambda) {
+  auto lambda = [] { return std::unique_ptr<int>(new int(42)); };
+  auto future = async(lambda);
+  EXPECT_EQ(42, *future.get());
+}
diff --git a/folly/executors/test/CodelTest.cpp b/folly/executors/test/CodelTest.cpp
new file mode 100644 (file)
index 0000000..7145001
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/Codel.h>
+#include <gtest/gtest.h>
+#include <chrono>
+#include <thread>
+
+using std::chrono::milliseconds;
+using std::this_thread::sleep_for;
+
+TEST(CodelTest, Basic) {
+  folly::Codel c;
+  std::this_thread::sleep_for(milliseconds(110));
+  // This interval is overloaded
+  EXPECT_FALSE(c.overloaded(milliseconds(100)));
+  std::this_thread::sleep_for(milliseconds(90));
+  // At least two requests must happen in an interval before they will fail
+  EXPECT_FALSE(c.overloaded(milliseconds(50)));
+  EXPECT_TRUE(c.overloaded(milliseconds(50)));
+  std::this_thread::sleep_for(milliseconds(110));
+  // Previous interval is overloaded, but 2ms isn't enough to fail
+  EXPECT_FALSE(c.overloaded(milliseconds(2)));
+  std::this_thread::sleep_for(milliseconds(90));
+  // 20 ms > target interval * 2
+  EXPECT_TRUE(c.overloaded(milliseconds(20)));
+}
+
+TEST(CodelTest, highLoad) {
+  folly::Codel c;
+  c.overloaded(milliseconds(40));
+  EXPECT_EQ(100, c.getLoad());
+}
+
+TEST(CodelTest, mediumLoad) {
+  folly::Codel c;
+  c.overloaded(milliseconds(20));
+  sleep_for(milliseconds(90));
+  // this is overloaded but this request shouldn't drop because it's not >
+  // slough timeout
+  EXPECT_FALSE(c.overloaded(milliseconds(8)));
+  EXPECT_GT(100, c.getLoad());
+}
+
+TEST(CodelTest, reducingLoad) {
+  folly::Codel c;
+  c.overloaded(milliseconds(20));
+  sleep_for(milliseconds(90));
+  EXPECT_FALSE(c.overloaded(milliseconds(4)));
+}
+
+TEST(CodelTest, oneRequestNoDrop) {
+  folly::Codel c;
+  EXPECT_FALSE(c.overloaded(milliseconds(20)));
+}
+
+TEST(CodelTest, getLoadSanity) {
+  folly::Codel c;
+  // should be 100% but leave a litte wiggle room.
+  c.overloaded(milliseconds(10));
+  EXPECT_LT(99, c.getLoad());
+  EXPECT_GT(101, c.getLoad());
+
+  // should be 70% but leave a litte wiggle room.
+  c.overloaded(milliseconds(7));
+  EXPECT_LT(60, c.getLoad());
+  EXPECT_GT(80, c.getLoad());
+
+  // should be 20% but leave a litte wiggle room.
+  c.overloaded(milliseconds(2));
+  EXPECT_LT(10, c.getLoad());
+  EXPECT_GT(30, c.getLoad());
+
+  // this test demonstrates how silly getLoad() is, but silly isn't
+  // necessarily useless
+}
diff --git a/folly/executors/test/FiberIOExecutorTest.cpp b/folly/executors/test/FiberIOExecutorTest.cpp
new file mode 100644 (file)
index 0000000..87d60c7
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <folly/executors/FiberIOExecutor.h>
+#include <folly/executors/IOThreadPoolExecutor.h>
+
+#include <folly/portability/GTest.h>
+
+namespace {
+
+class FiberIOExecutorTest : public testing::Test {};
+}
+
+TEST_F(FiberIOExecutorTest, event_base) {
+  auto tpe = std::make_shared<folly::IOThreadPoolExecutor>(1);
+  folly::FiberIOExecutor e(tpe);
+
+  ASSERT_NE(e.getEventBase(), nullptr);
+  ASSERT_EQ(e.getEventBase(), tpe->getEventBase());
+}
+
+TEST_F(FiberIOExecutorTest, basic_execution) {
+  auto tpe = std::make_shared<folly::IOThreadPoolExecutor>(1);
+  folly::FiberIOExecutor e(tpe);
+
+  // FiberIOExecutor should add tasks using the FiberManager mapped to the
+  // IOThreadPoolExecutor's event base.
+  folly::Baton<> baton;
+  bool inContext = false;
+
+  e.add([&]() {
+    inContext = folly::fibers::onFiber();
+    auto& lc = dynamic_cast<folly::fibers::EventBaseLoopController&>(
+        folly::fibers::getFiberManager(*e.getEventBase()).loopController());
+    auto& eb = lc.getEventBase()->getEventBase();
+    inContext =
+        inContext && &eb == folly::EventBaseManager::get()->getEventBase();
+    baton.post();
+  });
+  baton.wait();
+
+  ASSERT_TRUE(inContext);
+}
diff --git a/folly/executors/test/GlobalExecutorTest.cpp b/folly/executors/test/GlobalExecutorTest.cpp
new file mode 100644 (file)
index 0000000..06484cf
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/GlobalExecutor.h>
+#include <folly/executors/IOExecutor.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+
+TEST(GlobalExecutorTest, GlobalCPUExecutor) {
+  class DummyExecutor : public folly::Executor {
+   public:
+    void add(Func f) override {
+      f();
+      count++;
+    }
+    int count{0};
+  };
+
+  // The default CPU executor is a synchronous inline executor, lets verify
+  // that work we add is executed
+  auto count = 0;
+  auto f = [&]() { count++; };
+
+  // Don't explode, we should create the default global CPUExecutor lazily here.
+  getCPUExecutor()->add(f);
+  EXPECT_EQ(1, count);
+
+  {
+    auto dummy = std::make_shared<DummyExecutor>();
+    setCPUExecutor(dummy);
+    getCPUExecutor()->add(f);
+    // Make sure we were properly installed.
+    EXPECT_EQ(1, dummy->count);
+    EXPECT_EQ(2, count);
+  }
+
+  // Don't explode, we should restore the default global CPUExecutor because our
+  // weak reference to dummy has expired
+  getCPUExecutor()->add(f);
+  EXPECT_EQ(3, count);
+}
+
+TEST(GlobalExecutorTest, GlobalIOExecutor) {
+  class DummyExecutor : public IOExecutor {
+   public:
+    void add(Func) override {
+      count++;
+    }
+    folly::EventBase* getEventBase() override {
+      return nullptr;
+    }
+    int count{0};
+  };
+
+  auto f = []() {};
+
+  // Don't explode, we should create the default global IOExecutor lazily here.
+  getIOExecutor()->add(f);
+
+  {
+    auto dummy = std::make_shared<DummyExecutor>();
+    setIOExecutor(dummy);
+    getIOExecutor()->add(f);
+    // Make sure we were properly installed.
+    EXPECT_EQ(1, dummy->count);
+  }
+
+  // Don't explode, we should restore the default global IOExecutor because our
+  // weak reference to dummy has expired
+  getIOExecutor()->add(f);
+}
diff --git a/folly/executors/test/SerialExecutorTest.cpp b/folly/executors/test/SerialExecutorTest.cpp
new file mode 100644 (file)
index 0000000..1a132ef
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+
+#include <gtest/gtest.h>
+
+#include <folly/Baton.h>
+#include <folly/executors/CPUThreadPoolExecutor.h>
+#include <folly/executors/SerialExecutor.h>
+#include <folly/futures/InlineExecutor.h>
+
+using namespace std::chrono;
+using folly::SerialExecutor;
+
+namespace {
+void burnMs(uint64_t ms) {
+  /* sleep override */ std::this_thread::sleep_for(milliseconds(ms));
+}
+} // namespace
+
+void SimpleTest(std::shared_ptr<folly::Executor> const& parent) {
+  SerialExecutor executor(parent);
+
+  std::vector<int> values;
+  std::vector<int> expected;
+
+  for (int i = 0; i < 20; ++i) {
+    executor.add([i, &values] {
+      // make this extra vulnerable to concurrent execution
+      values.push_back(0);
+      burnMs(10);
+      values.back() = i;
+    });
+    expected.push_back(i);
+  }
+
+  // wait until last task has executed
+  folly::Baton<> finished_baton;
+  executor.add([&finished_baton] { finished_baton.post(); });
+  finished_baton.wait();
+
+  EXPECT_EQ(expected, values);
+}
+
+TEST(SerialExecutor, Simple) {
+  SimpleTest(std::make_shared<folly::CPUThreadPoolExecutor>(4));
+}
+TEST(SerialExecutor, SimpleInline) {
+  SimpleTest(std::make_shared<folly::InlineExecutor>());
+}
+
+// The Afterlife test only works with an asynchronous executor (not the
+// InlineExecutor), because we want execution of tasks to happen after we
+// destroy the SerialExecutor
+TEST(SerialExecutor, Afterlife) {
+  auto cpu_executor = std::make_shared<folly::CPUThreadPoolExecutor>(4);
+  auto executor = std::make_unique<SerialExecutor>(cpu_executor);
+
+  // block executor until we call start_baton.post()
+  folly::Baton<> start_baton;
+  executor->add([&start_baton] { start_baton.wait(); });
+
+  std::vector<int> values;
+  std::vector<int> expected;
+
+  for (int i = 0; i < 20; ++i) {
+    executor->add([i, &values] {
+      // make this extra vulnerable to concurrent execution
+      values.push_back(0);
+      burnMs(10);
+      values.back() = i;
+    });
+    expected.push_back(i);
+  }
+
+  folly::Baton<> finished_baton;
+  executor->add([&finished_baton] { finished_baton.post(); });
+
+  // destroy SerialExecutor
+  executor.reset();
+
+  // now kick off the tasks
+  start_baton.post();
+
+  // wait until last task has executed
+  finished_baton.wait();
+
+  EXPECT_EQ(expected, values);
+}
+
+void RecursiveAddTest(std::shared_ptr<folly::Executor> const& parent) {
+  SerialExecutor executor(parent);
+
+  folly::Baton<> finished_baton;
+
+  std::vector<int> values;
+  std::vector<int> expected = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}};
+
+  int i = 0;
+  std::function<void()> lambda = [&] {
+    if (i < 10) {
+      // make this extra vulnerable to concurrent execution
+      values.push_back(0);
+      burnMs(10);
+      values.back() = i;
+      executor.add(lambda);
+    } else if (i < 12) {
+      // Below we will post this lambda three times to the executor. When
+      // executed, the lambda will re-post itself during the first ten
+      // executions. Afterwards we do nothing twice (this else-branch), and
+      // then on the 13th execution signal that we are finished.
+    } else {
+      finished_baton.post();
+    }
+    ++i;
+  };
+
+  executor.add(lambda);
+  executor.add(lambda);
+  executor.add(lambda);
+
+  // wait until last task has executed
+  finished_baton.wait();
+
+  EXPECT_EQ(expected, values);
+}
+
+TEST(SerialExecutor, RecursiveAdd) {
+  RecursiveAddTest(std::make_shared<folly::CPUThreadPoolExecutor>(4));
+}
+TEST(SerialExecutor, RecursiveAddInline) {
+  RecursiveAddTest(std::make_shared<folly::InlineExecutor>());
+}
+
+TEST(SerialExecutor, ExecutionThrows) {
+  SerialExecutor executor(std::make_shared<folly::InlineExecutor>());
+
+  // an empty Func will throw std::bad_function_call when invoked,
+  // but SerialExecutor should catch that exception
+  executor.add(folly::Func{});
+}
diff --git a/folly/executors/test/ThreadPoolExecutorTest.cpp b/folly/executors/test/ThreadPoolExecutorTest.cpp
new file mode 100644 (file)
index 0000000..66388f9
--- /dev/null
@@ -0,0 +1,596 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+
+#include <folly/executors/CPUThreadPoolExecutor.h>
+#include <folly/executors/FutureExecutor.h>
+#include <folly/executors/IOThreadPoolExecutor.h>
+#include <folly/executors/LifoSemMPMCQueue.h>
+#include <folly/executors/PriorityThreadFactory.h>
+#include <folly/executors/ThreadPoolExecutor.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+using namespace std::chrono;
+
+static Func burnMs(uint64_t ms) {
+  return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
+}
+
+template <class TPE>
+static void basic() {
+  // Create and destroy
+  TPE tpe(10);
+}
+
+TEST(ThreadPoolExecutorTest, CPUBasic) {
+  basic<CPUThreadPoolExecutor>();
+}
+
+TEST(IOThreadPoolExecutorTest, IOBasic) {
+  basic<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resize() {
+  TPE tpe(100);
+  EXPECT_EQ(100, tpe.numThreads());
+  tpe.setNumThreads(50);
+  EXPECT_EQ(50, tpe.numThreads());
+  tpe.setNumThreads(150);
+  EXPECT_EQ(150, tpe.numThreads());
+}
+
+TEST(ThreadPoolExecutorTest, CPUResize) {
+  resize<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResize) {
+  resize<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void stop() {
+  TPE tpe(1);
+  std::atomic<int> completed(0);
+  auto f = [&]() {
+    burnMs(10)();
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.stop();
+  EXPECT_GT(1000, completed);
+}
+
+// IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
+// to the event base, will be executed upon its destruction, and cannot be
+// taken back.
+template <>
+void stop<IOThreadPoolExecutor>() {
+  IOThreadPoolExecutor tpe(1);
+  std::atomic<int> completed(0);
+  auto f = [&]() {
+    burnMs(10)();
+    completed++;
+  };
+  for (int i = 0; i < 10; i++) {
+    tpe.add(f);
+  }
+  tpe.stop();
+  EXPECT_EQ(10, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUStop) {
+  stop<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOStop) {
+  stop<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void join() {
+  TPE tpe(10);
+  std::atomic<int> completed(0);
+  auto f = [&]() {
+    burnMs(1)();
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.join();
+  EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUJoin) {
+  join<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOJoin) {
+  join<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resizeUnderLoad() {
+  TPE tpe(10);
+  std::atomic<int> completed(0);
+  auto f = [&]() {
+    burnMs(1)();
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.setNumThreads(5);
+  tpe.setNumThreads(15);
+  tpe.join();
+  EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
+  resizeUnderLoad<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
+  resizeUnderLoad<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void poolStats() {
+  folly::Baton<> startBaton, endBaton;
+  TPE tpe(1);
+  auto stats = tpe.getPoolStats();
+  EXPECT_EQ(1, stats.threadCount);
+  EXPECT_EQ(1, stats.idleThreadCount);
+  EXPECT_EQ(0, stats.activeThreadCount);
+  EXPECT_EQ(0, stats.pendingTaskCount);
+  EXPECT_EQ(0, tpe.getPendingTaskCount());
+  EXPECT_EQ(0, stats.totalTaskCount);
+  tpe.add([&]() {
+    startBaton.post();
+    endBaton.wait();
+  });
+  tpe.add([&]() {});
+  startBaton.wait();
+  stats = tpe.getPoolStats();
+  EXPECT_EQ(1, stats.threadCount);
+  EXPECT_EQ(0, stats.idleThreadCount);
+  EXPECT_EQ(1, stats.activeThreadCount);
+  EXPECT_EQ(1, stats.pendingTaskCount);
+  EXPECT_EQ(1, tpe.getPendingTaskCount());
+  EXPECT_EQ(2, stats.totalTaskCount);
+  endBaton.post();
+}
+
+TEST(ThreadPoolExecutorTest, CPUPoolStats) {
+  poolStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOPoolStats) {
+  poolStats<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void taskStats() {
+  TPE tpe(1);
+  std::atomic<int> c(0);
+  tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
+    int i = c++;
+    EXPECT_LT(milliseconds(0), stats.runTime);
+    if (i == 1) {
+      EXPECT_LT(milliseconds(0), stats.waitTime);
+    }
+  });
+  tpe.add(burnMs(10));
+  tpe.add(burnMs(10));
+  tpe.join();
+  EXPECT_EQ(2, c);
+}
+
+TEST(ThreadPoolExecutorTest, CPUTaskStats) {
+  taskStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOTaskStats) {
+  taskStats<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void expiration() {
+  TPE tpe(1);
+  std::atomic<int> statCbCount(0);
+  tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
+    int i = statCbCount++;
+    if (i == 0) {
+      EXPECT_FALSE(stats.expired);
+    } else if (i == 1) {
+      EXPECT_TRUE(stats.expired);
+    } else {
+      FAIL();
+    }
+  });
+  std::atomic<int> expireCbCount(0);
+  auto expireCb = [&]() { expireCbCount++; };
+  tpe.add(burnMs(10), seconds(60), expireCb);
+  tpe.add(burnMs(10), milliseconds(10), expireCb);
+  tpe.join();
+  EXPECT_EQ(2, statCbCount);
+  EXPECT_EQ(1, expireCbCount);
+}
+
+TEST(ThreadPoolExecutorTest, CPUExpiration) {
+  expiration<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOExpiration) {
+  expiration<IOThreadPoolExecutor>();
+}
+
+template <typename TPE>
+static void futureExecutor() {
+  FutureExecutor<TPE> fe(2);
+  std::atomic<int> c{0};
+  fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
+    c++;
+    EXPECT_EQ(42, t.value());
+  });
+  fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
+    c++;
+    EXPECT_EQ(100, t.value());
+  });
+  fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
+    c++;
+    EXPECT_NO_THROW(t.value());
+  });
+  fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
+    c++;
+    EXPECT_NO_THROW(t.value());
+  });
+  fe.addFuture([]() { throw std::runtime_error("oops"); })
+      .then([&](Try<Unit>&& t) {
+        c++;
+        EXPECT_THROW(t.value(), std::runtime_error);
+      });
+  // Test doing actual async work
+  folly::Baton<> baton;
+  fe.addFuture([&]() {
+      auto p = std::make_shared<Promise<int>>();
+      std::thread t([p]() {
+        burnMs(10)();
+        p->setValue(42);
+      });
+      t.detach();
+      return p->getFuture();
+    })
+      .then([&](Try<int>&& t) {
+        EXPECT_EQ(42, t.value());
+        c++;
+        baton.post();
+      });
+  baton.wait();
+  fe.join();
+  EXPECT_EQ(6, c);
+}
+
+TEST(ThreadPoolExecutorTest, CPUFuturePool) {
+  futureExecutor<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOFuturePool) {
+  futureExecutor<IOThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
+  bool tookLopri = false;
+  auto completed = 0;
+  auto hipri = [&] {
+    EXPECT_FALSE(tookLopri);
+    completed++;
+  };
+  auto lopri = [&] {
+    tookLopri = true;
+    completed++;
+  };
+  CPUThreadPoolExecutor pool(0, 2);
+  for (int i = 0; i < 50; i++) {
+    pool.addWithPriority(lopri, Executor::LO_PRI);
+  }
+  for (int i = 0; i < 50; i++) {
+    pool.addWithPriority(hipri, Executor::HI_PRI);
+  }
+  pool.setNumThreads(1);
+  pool.join();
+  EXPECT_EQ(100, completed);
+}
+
+class TestObserver : public ThreadPoolExecutor::Observer {
+ public:
+  void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
+    threads_++;
+  }
+  void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
+    threads_--;
+  }
+  void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
+    threads_++;
+  }
+  void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
+    threads_--;
+  }
+  void checkCalls() {
+    ASSERT_EQ(threads_, 0);
+  }
+
+ private:
+  std::atomic<int> threads_{0};
+};
+
+TEST(ThreadPoolExecutorTest, IOObserver) {
+  auto observer = std::make_shared<TestObserver>();
+
+  {
+    IOThreadPoolExecutor exe(10);
+    exe.addObserver(observer);
+    exe.setNumThreads(3);
+    exe.setNumThreads(0);
+    exe.setNumThreads(7);
+    exe.removeObserver(observer);
+    exe.setNumThreads(10);
+  }
+
+  observer->checkCalls();
+}
+
+TEST(ThreadPoolExecutorTest, CPUObserver) {
+  auto observer = std::make_shared<TestObserver>();
+
+  {
+    CPUThreadPoolExecutor exe(10);
+    exe.addObserver(observer);
+    exe.setNumThreads(3);
+    exe.setNumThreads(0);
+    exe.setNumThreads(7);
+    exe.removeObserver(observer);
+    exe.setNumThreads(10);
+  }
+
+  observer->checkCalls();
+}
+
+TEST(ThreadPoolExecutorTest, AddWithPriority) {
+  std::atomic_int c{0};
+  auto f = [&] { c++; };
+
+  // IO exe doesn't support priorities
+  IOThreadPoolExecutor ioExe(10);
+  EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
+
+  CPUThreadPoolExecutor cpuExe(10, 3);
+  cpuExe.addWithPriority(f, -1);
+  cpuExe.addWithPriority(f, 0);
+  cpuExe.addWithPriority(f, 1);
+  cpuExe.addWithPriority(f, -2); // will add at the lowest priority
+  cpuExe.addWithPriority(f, 2); // will add at the highest priority
+  cpuExe.addWithPriority(f, Executor::LO_PRI);
+  cpuExe.addWithPriority(f, Executor::HI_PRI);
+  cpuExe.join();
+
+  EXPECT_EQ(7, c);
+}
+
+TEST(ThreadPoolExecutorTest, BlockingQueue) {
+  std::atomic_int c{0};
+  auto f = [&] {
+    burnMs(1)();
+    c++;
+  };
+  const int kQueueCapacity = 1;
+  const int kThreads = 1;
+
+  auto queue = std::make_unique<LifoSemMPMCQueue<
+      CPUThreadPoolExecutor::CPUTask,
+      QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
+
+  CPUThreadPoolExecutor cpuExe(
+      kThreads,
+      std::move(queue),
+      std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+  // Add `f` five times. It sleeps for 1ms every time. Calling
+  // `cppExec.add()` is *almost* guaranteed to block because there's
+  // only 1 cpu worker thread.
+  for (int i = 0; i < 5; i++) {
+    EXPECT_NO_THROW(cpuExe.add(f));
+  }
+  cpuExe.join();
+
+  EXPECT_EQ(5, c);
+}
+
+TEST(PriorityThreadFactoryTest, ThreadPriority) {
+  PriorityThreadFactory factory(
+      std::make_shared<NamedThreadFactory>("stuff"), 1);
+  int actualPriority = -21;
+  factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
+      .join();
+  EXPECT_EQ(1, actualPriority);
+}
+
+class TestData : public folly::RequestData {
+ public:
+  explicit TestData(int data) : data_(data) {}
+  ~TestData() override {}
+  int data_;
+};
+
+TEST(ThreadPoolExecutorTest, RequestContext) {
+  CPUThreadPoolExecutor executor(1);
+
+  RequestContextScopeGuard rctx; // create new request context for this scope
+  EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
+  RequestContext::get()->setContextData(
+      "test", std::unique_ptr<TestData>(new TestData(42)));
+  auto data = RequestContext::get()->getContextData("test");
+  EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
+
+  executor.add([] {
+    auto data = RequestContext::get()->getContextData("test");
+    ASSERT_TRUE(data != nullptr);
+    EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
+  });
+}
+
+struct SlowMover {
+  explicit SlowMover(bool slow = false) : slow(slow) {}
+  SlowMover(SlowMover&& other) noexcept {
+    *this = std::move(other);
+  }
+  SlowMover& operator=(SlowMover&& other) noexcept {
+    slow = other.slow;
+    if (slow) {
+      /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
+    }
+    return *this;
+  }
+
+  bool slow;
+};
+
+TEST(ThreadPoolExecutorTest, BugD3527722) {
+  // Test that the queue does not get stuck if writes are completed in
+  // order opposite to how they are initiated.
+  LifoSemMPMCQueue<SlowMover> q(1024);
+  std::atomic<int> turn{};
+
+  std::thread consumer1([&] {
+    ++turn;
+    q.take();
+  });
+  std::thread consumer2([&] {
+    ++turn;
+    q.take();
+  });
+
+  std::thread producer1([&] {
+    ++turn;
+    while (turn < 4)
+      ;
+    ++turn;
+    q.add(SlowMover(true));
+  });
+  std::thread producer2([&] {
+    ++turn;
+    while (turn < 5)
+      ;
+    q.add(SlowMover(false));
+  });
+
+  producer1.join();
+  producer2.join();
+  consumer1.join();
+  consumer2.join();
+}
+
+template <typename TPE, typename ERR_T>
+static void ShutdownTest() {
+  // test that adding a .then() after we have
+  // started shutting down does not deadlock
+  folly::Optional<folly::Future<int>> f;
+  {
+    TPE fe(1);
+    f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
+      return 77;
+    });
+  }
+  EXPECT_THROW(f->get(), ERR_T);
+}
+
+TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
+  ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
+}
+
+TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
+  ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
+}
+
+template <typename TPE>
+static void removeThreadTest() {
+  // test that adding a .then() after we have removed some threads
+  // doesn't cause deadlock and they are executed on different threads
+  folly::Optional<folly::Future<int>> f;
+  std::thread::id id1, id2;
+  TPE fe(2);
+  f = folly::makeFuture()
+          .via(&fe)
+          .then([&id1]() {
+            burnMs(100)();
+            id1 = std::this_thread::get_id();
+          })
+          .then([&id2]() {
+            return 77;
+            id2 = std::this_thread::get_id();
+          });
+  fe.setNumThreads(1);
+
+  // future::then should be fulfilled because there is other thread available
+  EXPECT_EQ(77, f->get());
+  // two thread should be different because then part should be rescheduled to
+  // the other thread
+  EXPECT_NE(id1, id2);
+}
+
+TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
+  removeThreadTest<IOThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
+  removeThreadTest<CPUThreadPoolExecutor>();
+}
+
+template <typename TPE>
+static void resizeThreadWhileExecutingTest() {
+  TPE tpe(10);
+  EXPECT_EQ(10, tpe.numThreads());
+
+  std::atomic<int> completed(0);
+  auto f = [&]() {
+    burnMs(10)();
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.setNumThreads(8);
+  EXPECT_EQ(8, tpe.numThreads());
+  tpe.setNumThreads(5);
+  EXPECT_EQ(5, tpe.numThreads());
+  tpe.setNumThreads(15);
+  EXPECT_EQ(15, tpe.numThreads());
+  tpe.stop();
+  EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
+  resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
+  resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
+}
diff --git a/folly/executors/test/ThreadedExecutorTest.cpp b/folly/executors/test/ThreadedExecutorTest.cpp
new file mode 100644 (file)
index 0000000..7bdf4c0
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/ThreadedExecutor.h>
+
+#include <gtest/gtest.h>
+
+#include <folly/Conv.h>
+#include <folly/futures/Future.h>
+#include <folly/gen/Base.h>
+
+namespace {
+
+class ThreadedExecutorTest : public testing::Test {};
+}
+
+TEST_F(ThreadedExecutorTest, example) {
+  folly::ThreadedExecutor x;
+  auto ret = folly::via(&x)
+                 .then([&] { return 42; })
+                 .then([&](int n) { return folly::to<std::string>(n); })
+                 .get();
+
+  EXPECT_EQ("42", ret);
+}
+
+TEST_F(ThreadedExecutorTest, dtor_waits) {
+  constexpr auto kDelay = std::chrono::milliseconds(100);
+  auto x = std::make_unique<folly::ThreadedExecutor>();
+  auto fut = folly::via(&*x, [&] { /* sleep override */
+                                   std::this_thread::sleep_for(kDelay);
+  });
+  x = nullptr;
+
+  EXPECT_TRUE(fut.isReady());
+}
+
+TEST_F(ThreadedExecutorTest, many) {
+  constexpr auto kNumTasks = 1024;
+  folly::ThreadedExecutor x;
+  auto rets =
+      folly::collect(
+          folly::gen::range<size_t>(0, kNumTasks) |
+          folly::gen::map([&](size_t i) {
+            return folly::via(&x).then([=] { return i; }).then([](size_t k) {
+              return folly::to<std::string>(k);
+            });
+          }) |
+          folly::gen::as<std::vector>())
+          .get();
+
+  EXPECT_EQ("42", rets[42]);
+}
+
+TEST_F(ThreadedExecutorTest, many_sleeping_constant_time) {
+  constexpr auto kNumTasks = 256;
+  constexpr auto kDelay = std::chrono::milliseconds(100);
+  folly::ThreadedExecutor x;
+  auto rets =
+      folly::collect(
+          folly::gen::range<size_t>(0, kNumTasks) |
+          folly::gen::map([&](size_t i) {
+            return folly::via(&x)
+                .then([=] {
+                  /* sleep override */ std::this_thread::sleep_for(kDelay);
+                })
+                .then([=] { return i; })
+                .then([](size_t k) { return folly::to<std::string>(k); });
+          }) |
+          folly::gen::as<std::vector>())
+          .get();
+
+  EXPECT_EQ("42", rets[42]);
+}
+
+TEST_F(ThreadedExecutorTest, many_sleeping_decreasing_time) {
+  constexpr auto kNumTasks = 256;
+  constexpr auto kDelay = std::chrono::milliseconds(100);
+  folly::ThreadedExecutor x;
+  auto rets =
+      folly::collect(
+          folly::gen::range<size_t>(0, kNumTasks) |
+          folly::gen::map([&](size_t i) {
+            return folly::via(&x)
+                .then([=] {
+                  auto delay = kDelay * (kNumTasks - i) / kNumTasks;
+                  /* sleep override */ std::this_thread::sleep_for(delay);
+                })
+                .then([=] { return i; })
+                .then([](size_t k) { return folly::to<std::string>(k); });
+          }) |
+          folly::gen::as<std::vector>())
+          .get();
+
+  EXPECT_EQ("42", rets[42]);
+}
diff --git a/folly/executors/test/UnboundedBlockingQueueTest.cpp b/folly/executors/test/UnboundedBlockingQueueTest.cpp
new file mode 100644 (file)
index 0000000..203c1eb
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/Baton.h>
+#include <folly/executors/UnboundedBlockingQueue.h>
+#include <gtest/gtest.h>
+#include <thread>
+
+using namespace folly;
+
+TEST(UnboundedQueuee, push_pop) {
+  UnboundedBlockingQueue<int> q;
+  q.add(42);
+  EXPECT_EQ(42, q.take());
+}
+TEST(UnboundedBlockingQueue, size) {
+  UnboundedBlockingQueue<int> q;
+  EXPECT_EQ(0, q.size());
+  q.add(42);
+  EXPECT_EQ(1, q.size());
+  q.take();
+  EXPECT_EQ(0, q.size());
+}
+
+TEST(UnboundedBlockingQueue, concurrent_push_pop) {
+  UnboundedBlockingQueue<int> q;
+  Baton<> b1, b2;
+  std::thread t([&] {
+    b1.post();
+    EXPECT_EQ(42, q.take());
+    EXPECT_EQ(0, q.size());
+    b2.post();
+  });
+  b1.wait();
+  q.add(42);
+  b2.wait();
+  EXPECT_EQ(0, q.size());
+  t.join();
+}
index ec50fa8..d0e2693 100644 (file)
@@ -722,9 +722,9 @@ Although inspired by the C++11 std::future interface, it is not a drop-in replac
 <p><tt>via()</tt> wouldn&#039;t be of much use without practical implementations around. We have a handful, and here&#039;s a (possibly incomplete) list.</p>
 
 <ul>
-<li><a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/ThreadPoolExecutor.h" target="_blank">ThreadPoolExecutor</a> is an abstract thread pool implementation that supports resizing, custom thread factories, pool and per-task stats, NUMA awareness, user-defined task expiration, and Codel task expiration. It and its subclasses are under active development. It currently has two implementations:<ul>
-<li><a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/CPUThreadPoolExecutor.h" target="_blank">CPUThreadPoolExecutor</a> is a general purpose thread pool. In addition to the above features, it also supports task priorities.</li>
-<li><a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/IOThreadPoolExecutor.h" target="_blank">IOThreadPoolExecutor</a> is similar to CPUThreadPoolExecutor, but each thread spins on an EventBase (accessible to callbacks via <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBaseManager.h" target="_blank">EventBaseManager</a>)</li>
+<li><a href="https://github.com/facebook/folly/blob/master/folly/executors/ThreadPoolExecutor.h" target="_blank">ThreadPoolExecutor</a> is an abstract thread pool implementation that supports resizing, custom thread factories, pool and per-task stats, NUMA awareness, user-defined task expiration, and Codel task expiration. It and its subclasses are under active development. It currently has two implementations:<ul>
+<li><a href="https://github.com/facebook/folly/blob/master/folly/executors/CPUThreadPoolExecutor.h" target="_blank">CPUThreadPoolExecutor</a> is a general purpose thread pool. In addition to the above features, it also supports task priorities.</li>
+<li><a href="https://github.com/facebook/folly/blob/master/folly/executors/IOThreadPoolExecutor.h" target="_blank">IOThreadPoolExecutor</a> is similar to CPUThreadPoolExecutor, but each thread spins on an EventBase (accessible to callbacks via <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBaseManager.h" target="_blank">EventBaseManager</a>)</li>
 </ul></li>
 <li>folly&#039;s <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBase.h" target="_blank">EventBase</a> is an Executor and executes work as a callback in the event loop</li>
 <li><a href="https://github.com/facebook/folly/blob/master/folly/futures/ManualExecutor.h" target="_blank">ManualExecutor</a> only executes work when manually cranked. This is useful for testing.</li>
@@ -732,7 +732,7 @@ Although inspired by the C++11 std::future interface, it is not a drop-in replac
 <li><a href="https://github.com/facebook/folly/blob/master/folly/futures/QueuedImmediateExecutor.h" target="_blank">QueuedImmediateExecutor</a> is similar to InlineExecutor, but work added during callback execution will be queued instead of immediately executed</li>
 <li><a href="https://github.com/facebook/folly/blob/master/folly/futures/ScheduledExecutor.h" target="_blank">ScheduledExecutor</a> is a subinterface of Executor that supports scheduled (i.e. delayed) execution. There aren&#039;t many implementations yet, see <a class="remarkup-task" href="#" target="_blank">T5924392</a></li>
 <li>Thrift&#039;s <a href="https://github.com/facebook/fbthrift/blob/master/thrift/lib/cpp/concurrency/ThreadManager.h" target="_blank">ThreadManager</a> is an Executor but we aim to deprecate it in favor of the aforementioned CPUThreadPoolExecutor</li>
-<li><a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/FutureExecutor.h" target="_blank">FutureExecutor</a> wraps another Executor and provides <tt>Future&lt;T&gt; addFuture(F func)</tt> which returns a Future representing the result of func. This is equivalent to <tt>futures::async(executor, func)</tt> and the latter should probably be preferred.</li>
+<li><a href="https://github.com/facebook/folly/blob/master/folly/executors/FutureExecutor.h" target="_blank">FutureExecutor</a> wraps another Executor and provides <tt>Future&lt;T&gt; addFuture(F func)</tt> which returns a Future representing the result of func. This is equivalent to <tt>futures::async(executor, func)</tt> and the latter should probably be preferred.</li>
 </ul></section><section class="dex_document"><h1>Timeouts and related features</h1><p class="dex_introduction">Futures provide a number of timing-related features. Here's an overview.</p><h2 id="timing-implementation">Timing implementation <a href="#timing-implementation" class="headerLink">#</a></h2>
 
 <h3 id="timing-resolution">Timing resolution <a href="#timing-resolution" class="headerLink">#</a></h3>
@@ -1179,4 +1179,4 @@ The three laws refer to a different formulation of the axioms, in terms of the K
 
 <p>The tradeoff is memory. Each continuation has a stack, and that stack is usually fixed-size and has to be big enough to support whatever ordinary computation you might want to do on it. So each living continuation requires a relatively large amount of memory. If you know the number of continuations will be small, this might be a good fit. In particular, it might be faster, the code might read cleaner, and debugging stack traces might be much easier.</p>
 
-<p>Futures takes the middle road between callback hell and continuations, one which has been trodden and proved useful in other languages. It doesn&#039;t claim to be the best model for all situations. Use your tools wisely.</p></section></section>
\ No newline at end of file
+<p>Futures takes the middle road between callback hell and continuations, one which has been trodden and proved useful in other languages. It doesn&#039;t claim to be the best model for all situations. Use your tools wisely.</p></section></section>