Simplify BackgroundThreads, move them to folly/experimental/ThreadedRepeatingFunction...
authorAlexey Spiridonov <lesha@fb.com>
Fri, 28 Apr 2017 22:52:16 +0000 (15:52 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 28 Apr 2017 23:10:23 +0000 (16:10 -0700)
Summary:
- `FunctionScheduler` and `EventBase` are great for sharing one thread for many functions, but one-function-per-thread is messy.
- Both of those implementations are complicated, but `FunctionThreads` is dead-simple.
- I made it even simpler by eliminating the former `incrementalSleep` in favor of `std::future::wait_for`, which allows instant interruption without a tweakable param. h/t aru777 for suggesting `std::future` instead of `std::condition_variable`.

Reviewed By: yfeldblum

Differential Revision: D4742134

fbshipit-source-id: b520bbcd5f218b2276200ffe8926722ae8a8d6ca

folly/Makefile.am
folly/experimental/FunctionScheduler.h
folly/experimental/ThreadedRepeatingFunctionRunner.cpp [new file with mode: 0644]
folly/experimental/ThreadedRepeatingFunctionRunner.h [new file with mode: 0644]
folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp [new file with mode: 0644]

index 8af497b..6d93dec 100644 (file)
@@ -104,6 +104,7 @@ nobase_follyinclude_HEADERS = \
        experimental/AtomicSharedPtr.h \
        experimental/detail/AtomicSharedPtr-detail.h \
        experimental/AutoTimer.h \
+       experimental/ThreadedRepeatingFunctionRunner.h \
        experimental/Bits.h \
        experimental/BitVectorCoding.h \
        experimental/DynamicParser.h \
@@ -536,6 +537,7 @@ libfolly_la_SOURCES = \
        Uri.cpp \
        Version.cpp \
        experimental/AsymmetricMemoryBarrier.cpp \
+       experimental/ThreadedRepeatingFunctionRunner.cpp \
        experimental/bser/Dump.cpp \
        experimental/bser/Load.cpp \
        experimental/DynamicParser.cpp \
index ba55fee..0c59084 100644 (file)
@@ -42,7 +42,9 @@ namespace folly {
  *
  *
  * Note: the class uses only one thread - if you want to use more than one
- *       thread use multiple FunctionScheduler objects
+ *       thread, either use multiple FunctionScheduler objects, or check out
+ *       ThreadedRepeatingFunctionRunner.h for a much simpler contract of
+ *       "run each function periodically in its own thread".
  *
  * start() schedules the functions, while shutdown() terminates further
  * scheduling.
diff --git a/folly/experimental/ThreadedRepeatingFunctionRunner.cpp b/folly/experimental/ThreadedRepeatingFunctionRunner.cpp
new file mode 100644 (file)
index 0000000..56a15c9
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/experimental/ThreadedRepeatingFunctionRunner.h"
+
+#include <glog/logging.h>
+#include <iostream>
+
+namespace folly {
+
+ThreadedRepeatingFunctionRunner::ThreadedRepeatingFunctionRunner() {}
+
+ThreadedRepeatingFunctionRunner::~ThreadedRepeatingFunctionRunner() {
+  stopAndWarn("ThreadedRepeatingFunctionRunner");
+}
+
+void ThreadedRepeatingFunctionRunner::stopAndWarn(
+    const std::string& class_of_destructor) {
+  if (stopImpl()) {
+    LOG(ERROR)
+        << "ThreadedRepeatingFunctionRunner::stop() should already have been "
+        << "called, since the " << class_of_destructor << " destructor is now "
+        << "running. This is unsafe because it means that its threads "
+        << "may be accessing class state that was already destroyed "
+        << "(e.g. derived class members, or members that were declared after "
+        << "the " << class_of_destructor << ") .";
+    stop();
+  }
+}
+
+void ThreadedRepeatingFunctionRunner::stop() {
+  stopImpl();
+}
+
+bool ThreadedRepeatingFunctionRunner::stopImpl() {
+  {
+    std::unique_lock<std::mutex> lock(stopMutex_);
+    if (stopping_) {
+      return false; // Do nothing if stop() is called twice.
+    }
+    stopping_ = true;
+  }
+  stopCv_.notify_all();
+  for (auto& t : threads_) {
+    t.join();
+  }
+  return true;
+}
+
+void ThreadedRepeatingFunctionRunner::add(
+    RepeatingFn fn,
+    std::chrono::milliseconds initialSleep) {
+  threads_.emplace_back(
+      &ThreadedRepeatingFunctionRunner::executeInLoop,
+      this,
+      std::move(fn),
+      initialSleep);
+}
+
+bool ThreadedRepeatingFunctionRunner::waitFor(
+    std::chrono::milliseconds duration) noexcept {
+  using clock = std::chrono::steady_clock;
+  const auto deadline = clock::now() + duration;
+  std::unique_lock<std::mutex> lock(stopMutex_);
+  stopCv_.wait_until(
+      lock, deadline, [&] { return stopping_ || clock::now() > deadline; });
+  return !stopping_;
+}
+
+void ThreadedRepeatingFunctionRunner::executeInLoop(
+    RepeatingFn fn,
+    std::chrono::milliseconds initialSleep) noexcept {
+  auto duration = initialSleep;
+  while (waitFor(duration)) {
+    duration = fn();
+  }
+}
+
+} // namespace folly
diff --git a/folly/experimental/ThreadedRepeatingFunctionRunner.h b/folly/experimental/ThreadedRepeatingFunctionRunner.h
new file mode 100644 (file)
index 0000000..878da72
--- /dev/null
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2015-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Function.h>
+#include <condition_variable>
+#include <thread>
+#include <vector>
+
+namespace folly {
+
+/**
+ * For each function `fn` you add to this object, `fn` will be run in a loop
+ * in its own thread, with the thread sleeping between invocations of `fn`
+ * for the duration returned by `fn`'s previous run.
+ *
+ * To clean up these threads, invoke `stop()`, which will interrupt sleeping
+ * threads.  `stop()` will wait for already-running functions to return.
+ *
+ * == Alternatives ==
+ *
+ * If you want to multiplex multiple functions on the same thread, you can
+ * either use EventBase with AsyncTimeout objects, or FunctionScheduler for
+ * a slightly simpler API.
+ *
+ * == Thread-safety ==
+ *
+ * This type follows the common rule that:
+ *  (1) const member functions are safe to call concurrently with const
+ *      member functions, but
+ *  (2) non-const member functions are not safe to call concurrently with
+ *      any member functions.
+ *
+ * == Pitfalls ==
+ *
+ * Threads and classes don't mix well in C++, so you have to be very careful
+ * if you want to have ThreadedRepeatingFunctionRunner as a member of your
+ * class.  A reasonable pattern looks like this:
+ *
+ * struct MyClass {
+ *   // Note that threads are NOT added in the constructor, for two reasons:
+ *   //
+ *   //   (1) If you added some, and had any subsequent initialization (e.g.
+ *   //       derived class constructors), 'this' would not be fully
+ *   //       constructed when the worker threads came up, causing
+ *   //       heisenbugs.
+ *   //
+ *   //   (2) Also, if your constructor threw after thread creation, the
+ *   //       class destructor would not be invoked, potentially leaving the
+ *   //       threads running too long.
+ *   //
+ *   // It's better to have explicit two-step initialization, or to lazily
+ *   // add threads the first time they are needed.
+ *   MyClass() : count_(0) {}
+ *
+ *   // You must stop the threads as early as possible in the destruction
+ *   // process (or even before).  In the case of a class hierarchy, the
+ *   // final class MUST always call stop() as the first thing in its
+ *   // destructor -- otherwise, the worker threads may access already-
+ *   // destroyed state.
+ *   ~MyClass() {
+ *     // if MyClass is abstract:
+ *     threads_.stopAndWarn("MyClass");
+ *     // Otherwise:
+ *     threads_.stop();
+ *   }
+ *
+ *   // See the constructor for why two-stage initialization is preferred.
+ *   void init() {
+ *     threads_.add(bind(&MyClass::incrementCount, this));
+ *   }
+ *
+ *   std::chrono::milliseconds incrementCount() {
+ *     ++count_;
+ *     return 10;
+ *   }
+ *
+ * private:
+ *   std::atomic<int> count_;
+ *   // Declared last because the threads' functions access other members.
+ *   ThreadedRepeatingFunctionRunner threads_;
+ * };
+ */
+class ThreadedRepeatingFunctionRunner final {
+ public:
+  // Returns how long to wait before the next repetition. Must not throw.
+  using RepeatingFn = folly::Function<std::chrono::milliseconds() noexcept>;
+
+  ThreadedRepeatingFunctionRunner();
+  ~ThreadedRepeatingFunctionRunner();
+
+  /**
+   * Ideally, you will call this before initiating the destruction of the
+   * host object.  Otherwise, this should be the first thing in the
+   * destruction sequence.  If it comes any later, worker threads may access
+   * class state that had already been destroyed.
+   */
+  void stop();
+
+  /**
+   * Must be called at the TOP of the destructor of any abstract class that
+   * contains ThreadedRepeatingFunctionRunner (directly or through a
+   * parent).  Any non-abstract class destructor must instead stop() at the
+   * top.
+   */
+  void stopAndWarn(const std::string& class_of_destructor);
+
+  /**
+   * Run your noexcept function `f` in a background loop, sleeping between
+   * calls for a duration returned by `f`.  Optionally waits for
+   * `initialSleep` before calling `f` for the first time.
+   *
+   * DANGER: If a non-final class has a ThreadedRepeatingFunctionRunner
+   * member (which, by the way, must be declared last in the class), then
+   * you must not call add() in your constructor.  Otherwise, your thread
+   * risks accessing uninitialized data belonging to a child class.  To
+   * avoid this design bug, prefer to use two-stage initialization to start
+   * your threads.
+   */
+  void add(
+      RepeatingFn f,
+      std::chrono::milliseconds initialSleep = std::chrono::milliseconds(0));
+
+  size_t size() const { return threads_.size(); }
+
+ private:
+  // Returns true if this is the first stop().
+  bool stopImpl();
+
+  // Sleep for a duration, or until stop() is called.
+  bool waitFor(std::chrono::milliseconds duration) noexcept;
+
+  // Noexcept allows us to get a good backtrace on crashes -- otherwise,
+  // std::terminate would get called **outside** of the thread function.
+  void executeInLoop(
+      RepeatingFn,
+      std::chrono::milliseconds initialSleep) noexcept;
+
+  std::mutex stopMutex_;
+  bool stopping_{false}; // protected by stopMutex_
+  std::condition_variable stopCv_;
+
+  std::vector<std::thread> threads_;
+};
+
+} // namespace folly
diff --git a/folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp b/folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp
new file mode 100644 (file)
index 0000000..28c79f3
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2015-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "folly/experimental/ThreadedRepeatingFunctionRunner.h"
+
+#include <folly/portability/GTest.h>
+#include <atomic>
+
+using namespace std;
+
+struct Foo {
+  explicit Foo(std::atomic<int>& d) : data(d) {}
+  ~Foo() {
+    runner_.stop();
+  }
+
+  void start() {
+    runner_.add([this]() {
+      ++data;
+      return std::chrono::seconds(0);
+    });
+  }
+
+  std::atomic<int>& data;
+  folly::ThreadedRepeatingFunctionRunner runner_; // Must be declared last
+};
+
+struct FooLongSleep {
+  explicit FooLongSleep(std::atomic<int>& d) : data(d) {}
+  ~FooLongSleep() {
+    runner_.stop();
+    data.store(-1);
+  }
+
+  void start() {
+    runner_.add([this]() {
+      data.store(1);
+      return 1000h; // Test would time out if we waited
+    });
+  }
+
+  std::atomic<int>& data;
+  folly::ThreadedRepeatingFunctionRunner runner_; // Must be declared last
+};
+
+TEST(TestThreadedRepeatingFunctionRunner, HandleBackgroundLoop) {
+  std::atomic<int> data(0);
+  {
+    Foo f(data);
+    EXPECT_EQ(0, data.load());
+    f.start(); // Runs increment thread in background
+    while (data.load() == 0) {
+      /* sleep override */ this_thread::sleep_for(chrono::milliseconds(10));
+    }
+  }
+  // The increment thread should have been destroyed
+  auto prev_val = data.load();
+  /* sleep override */ this_thread::sleep_for(chrono::milliseconds(100));
+  EXPECT_EQ(data.load(), prev_val);
+}
+
+TEST(TestThreadedRepeatingFunctionRunner, HandleLongSleepingThread) {
+  std::atomic<int> data(0);
+  {
+    FooLongSleep f(data);
+    EXPECT_EQ(0, data.load());
+    f.start();
+    while (data.load() == 0) {
+      /* sleep override */ this_thread::sleep_for(chrono::milliseconds(10));
+    }
+    EXPECT_EQ(1, data.load());
+  }
+  // Foo should have been destroyed, which stopped the thread!
+  EXPECT_EQ(-1, data.load());
+}