Add TimedDrivableExecutor to folly.
authorLee Howes <lwh@fb.com>
Fri, 19 Jan 2018 17:18:41 +0000 (09:18 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 19 Jan 2018 17:46:52 +0000 (09:46 -0800)
Summary: Adds a TimedDrivableExecutor implementation of DrivableExecutor that adds a driveUntil method. driveUntil is as drive, except that it takes a timepoint and will stop driving after that time to allow callers to time out more easily.

Reviewed By: yfeldblum

Differential Revision: D6658320

fbshipit-source-id: a75145748e78497ce107ae152f25729547883835

CMakeLists.txt
folly/Makefile.am
folly/executors/TimedDrivableExecutor.cpp [new file with mode: 0644]
folly/executors/TimedDrivableExecutor.h [new file with mode: 0644]
folly/executors/test/TimedDrivableExecutorTest.cpp [new file with mode: 0644]

index 6aa82b6..d581616 100755 (executable)
@@ -371,6 +371,7 @@ if (BUILD_TESTS)
       TEST serial_executor_test SOURCES SerialExecutorTest.cpp
       TEST thread_pool_executor_test SOURCES ThreadPoolExecutorTest.cpp
       TEST threaded_executor_test SOURCES ThreadedExecutorTest.cpp
+      TEST timed_drivable_executor_test SOURCES TimedDrivableExecutorTest.cpp
 
     DIRECTORY executors/task_queue/test/
       TEST unbounded_blocking_queue_test SOURCES UnboundedBlockingQueueTest.cpp
index 0dd44b1..66bb93e 100644 (file)
@@ -103,6 +103,7 @@ nobase_follyinclude_HEADERS = \
        executors/SerialExecutor.h \
        executors/ThreadPoolExecutor.h \
        executors/ThreadedExecutor.h \
+       executors/TimedDrivableExecutor.h \
        executors/task_queue/BlockingQueue.h \
        executors/task_queue/LifoSemMPMCQueue.h \
        executors/task_queue/PriorityLifoSemMPMCQueue.h \
@@ -541,6 +542,7 @@ libfolly_la_SOURCES = \
        executors/SerialExecutor.cpp \
        executors/ThreadPoolExecutor.cpp \
        executors/ThreadedExecutor.cpp \
+       executors/TimedDrivableExecutor.cpp \
        executors/QueuedImmediateExecutor.cpp \
        experimental/hazptr/hazptr.cpp \
        experimental/hazptr/memory_resource.cpp \
diff --git a/folly/executors/TimedDrivableExecutor.cpp b/folly/executors/TimedDrivableExecutor.cpp
new file mode 100644 (file)
index 0000000..4f1e21d
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/TimedDrivableExecutor.h>
+
+#include <cstring>
+#include <ctime>
+#include <string>
+#include <tuple>
+
+namespace folly {
+
+void TimedDrivableExecutor::add(Func callback) {
+  queue_.enqueue(std::move(callback));
+}
+
+void TimedDrivableExecutor::drive() {
+  wait();
+  run();
+}
+
+size_t TimedDrivableExecutor::run() {
+  size_t count = 0;
+  size_t n = queue_.size();
+
+  // If we have waited already, then func_ may have a value
+  if (func_) {
+    auto f = std::move(func_);
+    f();
+    count = 1;
+  }
+
+  while (count < n && queue_.try_dequeue(func_)) {
+    auto f = std::move(func_);
+    f();
+    ++count;
+  }
+
+  return count;
+}
+
+size_t TimedDrivableExecutor::drain() {
+  size_t tasksRun = 0;
+  size_t tasksForSingleRun = 0;
+  while ((tasksForSingleRun = run()) != 0) {
+    tasksRun += tasksForSingleRun;
+  }
+  return tasksRun;
+}
+
+void TimedDrivableExecutor::wait() {
+  if (!func_) {
+    queue_.dequeue(func_);
+  }
+}
+
+} // namespace folly
diff --git a/folly/executors/TimedDrivableExecutor.h b/folly/executors/TimedDrivableExecutor.h
new file mode 100644 (file)
index 0000000..f6e741b
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+#include <folly/concurrency/UnboundedQueue.h>
+#include <folly/executors/DrivableExecutor.h>
+
+namespace folly {
+
+/*
+ * A DrivableExecutor can be driven via its drive() method or its driveUntil()
+ * that drives until some time point.
+ */
+class TimedDrivableExecutor : public DrivableExecutor {
+ public:
+  /// Implements DrivableExecutor
+  void drive() override;
+
+  // Make progress if there is work to do and return true. Otherwise return
+  // false.
+  bool try_drive() {
+    return try_wait() && run() > 0;
+  }
+
+  // Make progress on this Executor's work. Acts as drive, except it will only
+  // wait for a period of timeout for work to be enqueued. If no work is
+  // enqueued by that point, it will return.
+  template <typename Rep, typename Period>
+  bool try_drive_for(const std::chrono::duration<Rep, Period>& timeout) {
+    return try_wait_for(timeout) && run() > 0;
+  }
+
+  // Make progress on this Executor's work. Acts as drive, except it will only
+  // wait until deadline for work to be enqueued. If no work is enqueued by
+  // that point, it will return.
+  template <typename Clock, typename Duration>
+  bool try_drive_until(
+      const std::chrono::time_point<Clock, Duration>& deadline) {
+    return try_wait_until(deadline) && run() > 0;
+  }
+
+  void add(Func) override;
+
+  /// Do work. Returns the number of functions that were executed (maybe 0).
+  /// Non-blocking, in the sense that we don't wait for work (we can't
+  /// control whether one of the functions blocks).
+  /// This is stable, it will not chase an ever-increasing tail of work.
+  /// This also means, there may be more work available to perform at the
+  /// moment that this returns.
+  size_t run();
+
+  // Do work until there is no more work to do.
+  // Returns the number of functions that were executed (maybe 0).
+  // Unlike run, this method is not stable. It will chase an infinite tail of
+  // work so should be used with care.
+  // There will be no work available to perform at the moment that this
+  // returns.
+  size_t drain();
+
+  /// Wait for work to do.
+  void wait();
+
+  // Return true if there is work to do, false otherwise
+  bool try_wait() {
+    return func_ || queue_.try_dequeue(func_);
+  }
+
+  /// Wait for work to do or for a period of timeout, whichever is sooner.
+  template <typename Rep, typename Period>
+  bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
+    return func_ || queue_.try_dequeue_for(func_, timeout);
+  }
+
+  /// Wait for work to do or until deadline passes, whichever is sooner.
+  template <typename Clock, typename Duration>
+  bool try_wait_until(
+      const std::chrono::time_point<Clock, Duration>& deadline) {
+    return func_ || queue_.try_dequeue_until(func_, deadline);
+  }
+
+ private:
+  UMPSCQueue<Func, true> queue_;
+  Func func_;
+};
+
+} // namespace folly
diff --git a/folly/executors/test/TimedDrivableExecutorTest.cpp b/folly/executors/test/TimedDrivableExecutorTest.cpp
new file mode 100644 (file)
index 0000000..fa3469e
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/TimedDrivableExecutor.h>
+
+#include <folly/futures/Future.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+using namespace folly;
+
+TEST(TimedDrivableExecutor, runIsStable) {
+  TimedDrivableExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  auto f2 = [&]() {
+    x.add(f1);
+    x.add(f1);
+  };
+  x.add(f2);
+  x.run();
+  EXPECT_EQ(count, 0);
+}
+
+TEST(TimedDrivableExecutor, drainIsNotStable) {
+  TimedDrivableExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  auto f2 = [&]() {
+    x.add(f1);
+    x.add(f1);
+  };
+  x.add(f2);
+  x.drain();
+  EXPECT_EQ(count, 2);
+}
+
+TEST(TimedDrivableExecutor, try_drive) {
+  TimedDrivableExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  x.try_drive();
+  EXPECT_EQ(count, 0);
+  x.add(f1);
+  x.try_drive();
+  EXPECT_EQ(count, 1);
+}
+
+TEST(TimedDrivableExecutor, try_drive_for) {
+  TimedDrivableExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  x.try_drive_for(std::chrono::milliseconds(100));
+  EXPECT_EQ(count, 0);
+  x.add(f1);
+  x.try_drive_for(std::chrono::milliseconds(100));
+  EXPECT_EQ(count, 1);
+}
+
+TEST(TimedDrivableExecutor, try_drive_until) {
+  TimedDrivableExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  x.try_drive_until(
+      std::chrono::system_clock::now() + std::chrono::milliseconds(100));
+  EXPECT_EQ(count, 0);
+  x.add(f1);
+  x.try_drive_until(
+      std::chrono::system_clock::now() + std::chrono::milliseconds(100));
+  EXPECT_EQ(count, 1);
+}