merge wangle/Executor.h and experimental/wangle/concurrent/Executor.h
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 15 Oct 2014 22:57:47 +0000 (15:57 -0700)
committerdcsommer <dcsommer@fb.com>
Fri, 17 Oct 2014 18:43:58 +0000 (11:43 -0700)
Summary:
the one in concurrent/ is a bit more generic, so I kept that as Executor and renamed the existing one ScheduledExecutor

because Hans is surfing I took the liberty of renaming Action->Func as an alias for std::function<void()>, because I think it's more reflective
also kept the version of add() that doesn't force rvalue-reference as it's more user friendly and probably not less performant in common cases (insert reference to "want speed? pass by value" here)

Test Plan: compiled some major relevant bits, will let contbuild show me anything I missed

Reviewed By: hans@fb.com

Subscribers: trunkagent, rushix, fbcode-common-diffs@, fugalh, msk, njormrod

FB internal diff: D1591237

Tasks: 5279196

folly/experimental/wangle/concurrent/Executor.h [deleted file]
folly/experimental/wangle/concurrent/ThreadFactory.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h
folly/wangle/Executor.h
folly/wangle/InlineExecutor.h
folly/wangle/ManualExecutor.cpp
folly/wangle/ManualExecutor.h
folly/wangle/QueuedImmediateExecutor.cpp
folly/wangle/QueuedImmediateExecutor.h
folly/wangle/ScheduledExecutor.h [new file with mode: 0644]

diff --git a/folly/experimental/wangle/concurrent/Executor.h b/folly/experimental/wangle/concurrent/Executor.h
deleted file mode 100644 (file)
index 2687ee6..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <functional>
-
-namespace folly { namespace wangle {
-
-typedef std::function<void()> Func;
-
-namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h
-
-class Executor {
- public:
-  virtual ~Executor() {};
-  virtual void add(Func func) = 0;
-};
-
-}
-
-}} // folly::wangle
index b5da075854e1f83f85ca843e5d5ecd3b4372552c..8f799065bbcaded3a3ba0a4104049a99642f7a12 100644 (file)
@@ -15,7 +15,7 @@
  */
 
 #pragma once
-#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/wangle/Executor.h>
 
 #include <thread>
 
index 88c25e88efa8e2fec62b066ea998048061b6a4b5..84b61051cefd6838a14a127e80955bce048911ee 100644 (file)
@@ -15,7 +15,7 @@
  */
 
 #pragma once
-#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/wangle/Executor.h>
 #include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
 #include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
 #include <folly/experimental/wangle/rx/Observable.h>
@@ -31,7 +31,7 @@
 
 namespace folly { namespace wangle {
 
-class ThreadPoolExecutor : public experimental::Executor {
+class ThreadPoolExecutor : public Executor {
  public:
   explicit ThreadPoolExecutor(
       size_t numThreads,
index 00a52b17b537445e0194439faa4a28e42803c702..cc9ce34db37e3ff56ba817cae5dbe9a21597535d 100644 (file)
 #pragma once
 
 #include <boost/noncopyable.hpp>
-#include <chrono>
 #include <functional>
-#include <memory>
-#include <stdexcept>
 
 namespace folly { namespace wangle {
+  typedef std::function<void()> Func;
+
   /// An Executor accepts units of work with add(), which should be
   /// threadsafe.
-  /// Like an Rx Scheduler. We should probably rename it to match now that it
-  /// has scheduling semantics too, but that's a codemod for another lazy
-  /// summer afternoon.
   class Executor : boost::noncopyable {
    public:
-     typedef std::function<void()> Action;
-     // Reality is that better than millisecond resolution is very hard to
-     // achieve. However, we reserve the right to be incredible.
-     typedef std::chrono::microseconds Duration;
-     typedef std::chrono::steady_clock::time_point TimePoint;
-
-     virtual ~Executor() = default;
-
-     /// Enqueue an action to be performed by this executor. This and all
-     /// schedule variants must be threadsafe.
-     virtual void add(Action&&) = 0;
-
-     /// A convenience function for shared_ptr to legacy functors.
-     ///
-     /// Sometimes you have a functor that is move-only, and therefore can't be
-     /// converted to a std::function (e.g. std::packaged_task). In that case,
-     /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
-     template <class P>
-     void addPtr(P fn) {
-       this->add([fn]() mutable { (*fn)(); });
-     }
-
-     /// Alias for add() (for Rx consistency)
-     void schedule(Action&& a) { add(std::move(a)); }
-
-     /// Schedule an action to be executed after dur time has elapsed
-     /// Expect millisecond resolution at best.
-     void schedule(Action&& a, Duration const& dur) {
-       scheduleAt(std::move(a), now() + dur);
-     }
-
-     /// Schedule an action to be executed at time t, or as soon afterward as
-     /// possible. Expect millisecond resolution at best. Must be threadsafe.
-     virtual void scheduleAt(Action&& a, TimePoint const& t) {
-       throw std::logic_error("unimplemented");
-     }
-
-     /// Get this executor's notion of time. Must be threadsafe.
-     virtual TimePoint now() {
-       return std::chrono::steady_clock::now();
-     }
+    virtual ~Executor() = default;
+
+    /// Enqueue a function to executed by this executor. This and all
+    /// variants must be threadsafe.
+    virtual void add(Func) = 0;
+
+    /// A convenience function for shared_ptr to legacy functors.
+    ///
+    /// Sometimes you have a functor that is move-only, and therefore can't be
+    /// converted to a std::function (e.g. std::packaged_task). In that case,
+    /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
+    template <class P>
+    void addPtr(P fn) {
+      this->add([fn]() mutable { (*fn)(); });
+    }
   };
 }}
index 48f82bdd382ea59d02316b0f2c9a0597007092d7..df9bfe29144087e7c0b0fe406a82be89a76e2026 100644 (file)
@@ -24,7 +24,7 @@ namespace folly { namespace wangle {
   /// QueuedImmediateExecutor.
   class InlineExecutor : public Executor {
    public:
-    void add(std::function<void()>&& f) override {
+    void add(Func f) override {
       f();
     }
   };
index 560475ccd5949d9efda64892179e91ff4a455a1a..4a2ee3a3438b64fcb9378b7bb872516674ee4d4d 100644 (file)
@@ -30,35 +30,35 @@ ManualExecutor::ManualExecutor() {
   }
 }
 
-void ManualExecutor::add(std::function<void()>&& callback) {
+void ManualExecutor::add(Func callback) {
   std::lock_guard<std::mutex> lock(lock_);
-  actions_.push(callback);
+  funcs_.push(std::move(callback));
   sem_post(&sem_);
 }
 
 size_t ManualExecutor::run() {
   size_t count;
   size_t n;
-  Action action;
+  Func func;
 
   {
     std::lock_guard<std::mutex> lock(lock_);
 
-    while (!scheduledActions_.empty()) {
-      auto& sa = scheduledActions_.top();
-      if (sa.time > now_)
+    while (!scheduledFuncs_.empty()) {
+      auto& sf = scheduledFuncs_.top();
+      if (sf.time > now_)
         break;
-      actions_.push(sa.action);
-      scheduledActions_.pop();
+      funcs_.push(sf.func);
+      scheduledFuncs_.pop();
     }
 
-    n = actions_.size();
+    n = funcs_.size();
   }
 
   for (count = 0; count < n; count++) {
     {
       std::lock_guard<std::mutex> lock(lock_);
-      if (actions_.empty()) {
+      if (funcs_.empty()) {
         break;
       }
 
@@ -67,10 +67,10 @@ size_t ManualExecutor::run() {
       // This may fail (with EAGAIN), that's fine.
       sem_trywait(&sem_);
 
-      action = std::move(actions_.front());
-      actions_.pop();
+      func = std::move(funcs_.front());
+      funcs_.pop();
     }
-    action();
+    func();
   }
 
   return count;
@@ -80,7 +80,7 @@ void ManualExecutor::wait() {
   while (true) {
     {
       std::lock_guard<std::mutex> lock(lock_);
-      if (!actions_.empty())
+      if (!funcs_.empty())
         break;
     }
 
index 76be4b4344206aac1d31731de431e24534afa0cb..27caf28cc7ed95eabe348a550f4610f4ddc24089 100644 (file)
@@ -15,7 +15,7 @@
  */
 
 #pragma once
-#include <folly/wangle/Executor.h>
+#include <folly/wangle/ScheduledExecutor.h>
 #include <semaphore.h>
 #include <memory>
 #include <mutex>
@@ -31,15 +31,15 @@ namespace folly { namespace wangle {
   ///
   /// NB No attempt has been made to make anything other than add and schedule
   /// threadsafe.
-  class ManualExecutor : public Executor {
+  class ManualExecutor : public ScheduledExecutor {
    public:
     ManualExecutor();
 
-    void add(Action&&) override;
+    void add(Func) override;
 
-    /// Do work. Returns the number of actions that were executed (maybe 0).
+    /// Do work. Returns the number of functions that were executed (maybe 0).
     /// Non-blocking, in the sense that we don't wait for work (we can't
-    /// control whether one of the actions blocks).
+    /// control whether one of the functions blocks).
     /// This is stable, it will not chase an ever-increasing tail of work.
     /// This also means, there may be more work available to perform at the
     /// moment that this returns.
@@ -60,9 +60,9 @@ namespace folly { namespace wangle {
         makeProgress();
     }
 
-    virtual void scheduleAt(Action&& a, TimePoint const& t) override {
+    virtual void scheduleAt(Func&& f, TimePoint const& t) override {
       std::lock_guard<std::mutex> lock(lock_);
-      scheduledActions_.emplace(t, std::move(a));
+      scheduledFuncs_.emplace(t, std::move(f));
       sem_post(&sem_);
     }
 
@@ -82,30 +82,30 @@ namespace folly { namespace wangle {
 
    private:
     std::mutex lock_;
-    std::queue<Action> actions_;
+    std::queue<Func> funcs_;
     sem_t sem_;
 
     // helper class to enable ordering of scheduled events in the priority
     // queue
-    struct ScheduledAction {
+    struct ScheduledFunc {
       TimePoint time;
       size_t ordinal;
-      Action action;
+      Func func;
 
-      ScheduledAction(TimePoint const& t, Action&& a)
-        : time(t), action(std::move(a))
+      ScheduledFunc(TimePoint const& t, Func&& f)
+        : time(t), func(std::move(f))
       {
         static size_t seq = 0;
         ordinal = seq++;
       }
 
-      bool operator<(ScheduledAction const& b) const {
+      bool operator<(ScheduledFunc const& b) const {
         if (time == b.time)
           return ordinal < b.ordinal;
         return time < b.time;
       }
     };
-    std::priority_queue<ScheduledAction> scheduledActions_;
+    std::priority_queue<ScheduledFunc> scheduledFuncs_;
     TimePoint now_ = now_.min();
   };
 
index 469d029643df83b144c98febfddb17ec93db95e0..739ba5ef820013f061e69779b8285795a75f58e7 100644 (file)
@@ -20,9 +20,8 @@
 
 namespace folly { namespace wangle {
 
-void QueuedImmediateExecutor::add(Action&& callback)
-{
-  thread_local std::queue<Action> q;
+void QueuedImmediateExecutor::add(Func callback) {
+  thread_local std::queue<Func> q;
 
   if (q.empty()) {
     q.push(std::move(callback));
index 46f14d4da2b13122183bf9ef38a63ac520886bd7..a4c4985df215e9e182db8950b57acc92b14630fd 100644 (file)
@@ -27,7 +27,7 @@ namespace folly { namespace wangle {
  */
 class QueuedImmediateExecutor : public Executor {
  public:
-  void add(Action&&) override;
+  void add(Func) override;
 };
 
 }} // namespace
diff --git a/folly/wangle/ScheduledExecutor.h b/folly/wangle/ScheduledExecutor.h
new file mode 100644 (file)
index 0000000..fe5f6f1
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/Executor.h>
+#include <chrono>
+#include <memory>
+#include <stdexcept>
+
+namespace folly { namespace wangle {
+  // An executor that supports timed scheduling. Like RxScheduler.
+  class ScheduledExecutor : public Executor {
+   public:
+     // Reality is that better than millisecond resolution is very hard to
+     // achieve. However, we reserve the right to be incredible.
+     typedef std::chrono::microseconds Duration;
+     typedef std::chrono::steady_clock::time_point TimePoint;
+
+     virtual ~ScheduledExecutor() = default;
+
+     virtual void add(Func) override = 0;
+
+     /// Alias for add() (for Rx consistency)
+     void schedule(Func&& a) { add(std::move(a)); }
+
+     /// Schedule a Func to be executed after dur time has elapsed
+     /// Expect millisecond resolution at best.
+     void schedule(Func&& a, Duration const& dur) {
+       scheduleAt(std::move(a), now() + dur);
+     }
+
+     /// Schedule a Func to be executed at time t, or as soon afterward as
+     /// possible. Expect millisecond resolution at best. Must be threadsafe.
+     virtual void scheduleAt(Func&& a, TimePoint const& t) {
+       throw std::logic_error("unimplemented");
+     }
+
+     /// Get this executor's notion of time. Must be threadsafe.
+     virtual TimePoint now() {
+       return std::chrono::steady_clock::now();
+     }
+  };
+}}