Scheduler interface of Executor
authorHans Fugal <fugalh@fb.com>
Thu, 19 Jun 2014 01:03:45 +0000 (18:03 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Thu, 26 Jun 2014 02:27:43 +0000 (19:27 -0700)
Summary: and ManualExecutor implementation

Test Plan: unit tests, contbuild

Reviewed By: davejwatson@fb.com

Subscribers: bmatheny, folly@lists, net-systems@, fugalh, exa, marccelani, jsedgwick

FB internal diff: D1392999

Tasks: 4548494

folly/wangle/Executor.h
folly/wangle/ManualExecutor.cpp
folly/wangle/ManualExecutor.h
folly/wangle/test/ExecutorTest.cpp [new file with mode: 0644]

index 29037b470b6ea770ff3c6b1de354b22e15ba7463..8cbd089a76a9558ff4cf8507ffe25379bde68bd1 100644 (file)
 
 #include <boost/noncopyable.hpp>
 #include <functional>
+#include <chrono>
 
 namespace folly { namespace wangle {
+  // Like an Rx Scheduler. We should probably rename it to match now that it
+  // has scheduling semantics too, but that's a codemod for another lazy
+  // summer afternoon.
   class Executor : boost::noncopyable {
    public:
+     typedef std::function<void()> Action;
+     // Reality is that better than millisecond resolution is very hard to
+     // achieve. However, we reserve the right to be incredible.
+     typedef std::chrono::microseconds Duration;
+     typedef std::chrono::steady_clock::time_point TimePoint;
+
      virtual ~Executor() = default;
-     virtual void add(std::function<void()>&&) = 0;
+
+     /// Enqueue an action to be performed by this executor. This and all
+     /// schedule variants must be threadsafe.
+     virtual void add(Action&&) = 0;
+
+     /// Alias for add() (for Rx consistency)
+     void schedule(Action&& a) { add(std::move(a)); }
+
+     /// Schedule an action to be executed after dur time has elapsed
+     /// Expect millisecond resolution at best.
+     void schedule(Action&& a, Duration const& dur) {
+       scheduleAt(std::move(a), now() + dur);
+     }
+
+     /// Schedule an action to be executed at time t, or as soon afterward as
+     /// possible. Expect millisecond resolution at best. Must be threadsafe.
+     virtual void scheduleAt(Action&& a, TimePoint const& t) {
+       throw std::logic_error("unimplemented");
+     }
+
+     /// Get this executor's notion of time. Must be threadsafe.
+     virtual TimePoint now() {
+       return std::chrono::steady_clock::now();
+     }
   };
 }}
index cb9e6aaf3000b2e65e578af32f8404378c6e58ad..eed8a84d597d7f34c5d0111d79ebdc03cf3dd766 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <string.h>
 #include <string>
+#include <tuple>
 
 #include <stdexcept>
 
@@ -31,24 +32,33 @@ ManualExecutor::ManualExecutor() {
 
 void ManualExecutor::add(std::function<void()>&& callback) {
   std::lock_guard<std::mutex> lock(lock_);
-  runnables_.push(callback);
+  actions_.push(callback);
   sem_post(&sem_);
 }
 
 size_t ManualExecutor::run() {
   size_t count;
   size_t n;
-  std::function<void()> runnable;
+  Action action;
 
   {
     std::lock_guard<std::mutex> lock(lock_);
-    n = runnables_.size();
+
+    while (!scheduledActions_.empty()) {
+      auto& sa = scheduledActions_.top();
+      if (sa.time > now_)
+        break;
+      actions_.push(sa.action);
+      scheduledActions_.pop();
+    }
+
+    n = actions_.size();
   }
 
   for (count = 0; count < n; count++) {
     {
       std::lock_guard<std::mutex> lock(lock_);
-      if (runnables_.empty()) {
+      if (actions_.empty()) {
         break;
       }
 
@@ -57,10 +67,10 @@ size_t ManualExecutor::run() {
       // This may fail (with EAGAIN), that's fine.
       sem_trywait(&sem_);
 
-      runnable = std::move(runnables_.front());
-      runnables_.pop();
+      action = std::move(actions_.front());
+      actions_.pop();
     }
-    runnable();
+    action();
   }
 
   return count;
@@ -70,7 +80,7 @@ void ManualExecutor::wait() {
   while (true) {
     {
       std::lock_guard<std::mutex> lock(lock_);
-      if (!runnables_.empty())
+      if (!actions_.empty())
         break;
     }
 
index 14c455faf0078be2b22f11d17e8be8cba810fb5d..e7fb8190935ebaead15355fe59c2e4c677149275 100644 (file)
 #include <memory>
 #include <mutex>
 #include <queue>
+#include <cstdio>
 
 namespace folly { namespace wangle {
-
+  /// A ManualExecutor only does work when you turn the crank, by calling
+  /// run() or indirectly with makeProgress() or waitFor().
+  ///
+  /// The clock for a manual executor starts at 0 and advances only when you
+  /// ask it to. i.e. time is also under manual control.
+  ///
+  /// NB No attempt has been made to make anything other than add and schedule
+  /// threadsafe.
   class ManualExecutor : public Executor {
    public:
     ManualExecutor();
 
-    void add(std::function<void()>&&) override;
+    void add(Action&&) override;
 
-    /// Do work. Returns the number of runnables that were executed (maybe 0).
-    /// Non-blocking.
+    /// Do work. Returns the number of actions that were executed (maybe 0).
+    /// Non-blocking, in the sense that we don't wait for work (we can't
+    /// control whether one of the actions blocks).
+    /// This is stable, it will not chase an ever-increasing tail of work.
+    /// This also means, there may be more work available to perform at the
+    /// moment that this returns.
     size_t run();
 
     /// Wait for work to do.
@@ -42,15 +54,64 @@ namespace folly { namespace wangle {
       run();
     }
 
+    /// makeProgress until this Future is ready.
     template <class F> void waitFor(F const& f) {
       while (!f.isReady())
         makeProgress();
     }
 
+    virtual void scheduleAt(Action&& a, TimePoint const& t) override {
+      std::lock_guard<std::mutex> lock(lock_);
+      scheduledActions_.emplace(t, std::move(a));
+      sem_post(&sem_);
+    }
+
+    /// Advance the clock. The clock never advances on its own.
+    /// Advancing the clock causes some work to be done, if work is available
+    /// to do (perhaps newly available because of the advanced clock).
+    /// If dur is <= 0 this is a noop.
+    void advance(Duration const& dur) {
+      advanceTo(now_ + dur);
+    }
+
+    /// Advance the clock to this absolute time. If t is <= now(),
+    /// this is a noop.
+    void advanceTo(TimePoint const& t) {
+      if (t > now_) {
+        now_ = t;
+      }
+      run();
+    }
+
+    TimePoint now() override { return now_; }
+
    private:
     std::mutex lock_;
-    std::queue<std::function<void()>> runnables_;
+    std::queue<Action> actions_;
     sem_t sem_;
+
+    // helper class to enable ordering of scheduled events in the priority
+    // queue
+    struct ScheduledAction {
+      TimePoint time;
+      size_t ordinal;
+      Action action;
+
+      ScheduledAction(TimePoint const& t, Action&& a)
+        : time(t), action(std::move(a))
+      {
+        static size_t seq = 0;
+        ordinal = seq++;
+      }
+
+      bool operator<(ScheduledAction const& b) const {
+        if (time == b.time)
+          return ordinal < b.ordinal;
+        return time < b.time;
+      }
+    };
+    std::priority_queue<ScheduledAction> scheduledActions_;
+    TimePoint now_ = now_.min();
   };
 
 }}
diff --git a/folly/wangle/test/ExecutorTest.cpp b/folly/wangle/test/ExecutorTest.cpp
new file mode 100644 (file)
index 0000000..5230869
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include "folly/wangle/ManualExecutor.h"
+
+using namespace testing;
+using namespace folly::wangle;
+using namespace std::chrono;
+
+TEST(ManualExecutor, runIsStable) {
+  ManualExecutor x;
+  size_t count = 0;
+  auto f1 = [&]() { count++; };
+  auto f2 = [&]() { x.add(f1); x.add(f1); };
+  x.add(f2);
+  x.run();
+}
+
+TEST(ManualExecutor, scheduleDur) {
+  ManualExecutor x;
+  size_t count = 0;
+  milliseconds dur {10};
+  x.schedule([&]{ count++; }, dur);
+  EXPECT_EQ(count, 0);
+  x.run();
+  EXPECT_EQ(count, 0);
+  x.advance(dur/2);
+  EXPECT_EQ(count, 0);
+  x.advance(dur/2);
+  EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, clockStartsAt0) {
+  ManualExecutor x;
+  EXPECT_EQ(x.now(), x.now().min());
+}
+
+TEST(ManualExecutor, scheduleAbs) {
+  ManualExecutor x;
+  size_t count = 0;
+  x.scheduleAt([&]{ count++; }, x.now() + milliseconds(10));
+  EXPECT_EQ(count, 0);
+  x.advance(milliseconds(10));
+  EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, advanceTo) {
+  ManualExecutor x;
+  size_t count = 0;
+  x.scheduleAt([&]{ count++; }, steady_clock::now());
+  EXPECT_EQ(count, 0);
+  x.advanceTo(steady_clock::now());
+  EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, advanceBack) {
+  ManualExecutor x;
+  size_t count = 0;
+  x.advance(microseconds(5));
+  x.schedule([&]{ count++; }, microseconds(6));
+  EXPECT_EQ(count, 0);
+  x.advanceTo(x.now() - microseconds(1));
+  EXPECT_EQ(count, 0);
+}
+
+TEST(ManualExecutor, advanceNeg) {
+  ManualExecutor x;
+  size_t count = 0;
+  x.advance(microseconds(5));
+  x.schedule([&]{ count++; }, microseconds(6));
+  EXPECT_EQ(count, 0);
+  x.advance(microseconds(-1));
+  EXPECT_EQ(count, 0);
+}