(folly) QueuedImmediateExecutor
authorHans Fugal <fugalh@fb.com>
Wed, 4 Jun 2014 22:38:24 +0000 (15:38 -0700)
committerNicholas Ormrod <njormrod@fb.com>
Fri, 27 Jun 2014 22:05:29 +0000 (15:05 -0700)
Summary: Add the `QueuedImmediateExecutor` which behaves like `InlineExecutor` but with different (and usually better) ordering semantics for nested calls.
@override-unit-failures

Test Plan: unit tests

Reviewed By: davejwatson@fb.com

Subscribers: folly@lists, net-systems@, fugalh, exa

FB internal diff: D1364904

Tasks: 3789661

folly/wangle/Executor.h
folly/wangle/InlineExecutor.h
folly/wangle/QueuedImmediateExecutor.cpp [new file with mode: 0644]
folly/wangle/QueuedImmediateExecutor.h [new file with mode: 0644]
folly/wangle/test/ExecutorTest.cpp

index 8cbd089a76a9558ff4cf8507ffe25379bde68bd1..a248c28e19591ce5ebf65f9b303c28a582bfe66e 100644 (file)
 #pragma once
 
 #include <boost/noncopyable.hpp>
-#include <functional>
 #include <chrono>
+#include <functional>
+#include <memory>
 
 namespace folly { namespace wangle {
-  // Like an Rx Scheduler. We should probably rename it to match now that it
-  // has scheduling semantics too, but that's a codemod for another lazy
-  // summer afternoon.
+  /// An Executor accepts units of work with add(), which should be
+  /// threadsafe.
+  /// Like an Rx Scheduler. We should probably rename it to match now that it
+  /// has scheduling semantics too, but that's a codemod for another lazy
+  /// summer afternoon.
   class Executor : boost::noncopyable {
    public:
      typedef std::function<void()> Action;
@@ -38,6 +41,16 @@ namespace folly { namespace wangle {
      /// schedule variants must be threadsafe.
      virtual void add(Action&&) = 0;
 
+     /// A convenience function for shared_ptr to legacy functors.
+     ///
+     /// Sometimes you have a functor that is move-only, and therefore can't be
+     /// converted to a std::function (e.g. std::packaged_task). In that case,
+     /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
+     template <class P>
+     void addPtr(P fn) {
+       this->add([fn]() mutable { (*fn)(); });
+     }
+
      /// Alias for add() (for Rx consistency)
      void schedule(Action&& a) { add(std::move(a)); }
 
index 8c338b15cbf1c4985e298053768b7dba3716175b..f86e48bd494d08c3a49b0d8ae011111971df5d5f 100644 (file)
@@ -19,6 +19,9 @@
 
 namespace folly { namespace wangle {
 
+  /// When work is "queued", execute it immediately inline.
+  /// Usually when you think you want this, you actually want a
+  /// QueuedImmediateExecutor.
   class InlineExecutor : public Executor {
    public:
     void add(std::function<void()>&& f) override {
diff --git a/folly/wangle/QueuedImmediateExecutor.cpp b/folly/wangle/QueuedImmediateExecutor.cpp
new file mode 100644 (file)
index 0000000..2df5bd3
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueuedImmediateExecutor.h"
+#include "folly/ThreadLocal.h"
+#include <queue>
+
+namespace folly { namespace wangle {
+
+void QueuedImmediateExecutor::add(Action&& callback)
+{
+  thread_local std::queue<Action> q;
+
+  if (q.empty()) {
+    q.push(std::move(callback));
+    while (!q.empty()) {
+      q.front()();
+      q.pop();
+    }
+  } else {
+    q.push(callback);
+  }
+}
+
+}} // namespace
diff --git a/folly/wangle/QueuedImmediateExecutor.h b/folly/wangle/QueuedImmediateExecutor.h
new file mode 100644 (file)
index 0000000..40e0831
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "folly/wangle/Executor.h"
+
+namespace folly { namespace wangle {
+
+/**
+ * Runs inline like InlineExecutor, but with a queue so that any tasks added
+ * to this executor by one of its own callbacks will be queued instead of
+ * executed inline (nested). This is usually better behavior than Inline.
+ */
+class QueuedImmediateExecutor : public Executor {
+ public:
+  void add(Action&&) override;
+};
+
+}} // namespace
index 5230869f3154e1c233fdb8bcc29b85ca23ab07bc..423b70914e1788b4a120ca777e4b7e55ce83e839 100644 (file)
  */
 
 #include <gtest/gtest.h>
+#include "folly/wangle/InlineExecutor.h"
 #include "folly/wangle/ManualExecutor.h"
+#include "folly/wangle/QueuedImmediateExecutor.h"
 
-using namespace testing;
 using namespace folly::wangle;
 using namespace std::chrono;
+using namespace testing;
 
 TEST(ManualExecutor, runIsStable) {
   ManualExecutor x;
@@ -86,3 +88,53 @@ TEST(ManualExecutor, advanceNeg) {
   x.advance(microseconds(-1));
   EXPECT_EQ(count, 0);
 }
+
+TEST(Executor, InlineExecutor) {
+  InlineExecutor x;
+  size_t counter = 0;
+  x.add([&]{
+    x.add([&]{
+      EXPECT_EQ(counter++, 0);
+    });
+    EXPECT_EQ(counter++, 1);
+  });
+  EXPECT_EQ(counter, 2);
+}
+
+TEST(Executor, QueuedImmediateExecutor) {
+  QueuedImmediateExecutor x;
+  size_t counter = 0;
+  x.add([&]{
+    x.add([&]{
+      EXPECT_EQ(1, counter++);
+    });
+    EXPECT_EQ(0, counter++);
+  });
+  EXPECT_EQ(2, counter);
+}
+
+TEST(Executor, Runnable) {
+  InlineExecutor x;
+  size_t counter = 0;
+  struct Runnable {
+    std::function<void()> fn;
+    void operator()() { fn(); }
+  };
+  Runnable f;
+  f.fn = [&]{ counter++; };
+  x.add(f);
+  EXPECT_EQ(counter, 1);
+}
+
+TEST(Executor, RunnablePtr) {
+  InlineExecutor x;
+  struct Runnable {
+    std::function<void()> fn;
+    void operator()() { fn(); }
+  };
+  size_t counter = 0;
+  auto fnp = std::make_shared<Runnable>();
+  fnp->fn = [&]{ counter++; };
+  x.addPtr(fnp);
+  EXPECT_EQ(counter, 1);
+}