(wangle) Timeouts basic
authorHans Fugal <fugalh@fb.com>
Fri, 26 Dec 2014 23:18:11 +0000 (15:18 -0800)
committerDave Watson <davejwatson@fb.com>
Mon, 29 Dec 2014 18:40:32 +0000 (10:40 -0800)
Summary:
Add basic timeout functionality. This adds `futures::sleep` which makes an async Future that finishes after the given duration, and `Future::get` which blocks on the result and takes an optional timeout.

Introducing the folly::wangle::futures namespace (soon to be just folly::futures) which will hold our wangle utility functions, the things that live in the Future object in Twitter's scala code. We'll probably move when* and wait-ish methods in here too, and perhaps alias makeFuture-ish methods too, though James has me mostly convinced not to deprecate them at the folly::wangle level (because they're basically Future constructors and Future lives at folly::wangle)

`Future::delayed` after Twitter's helper of the same name

Test Plan: new and old unit tests

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, exa, folly-diffs@

FB internal diff: D1748894

Tasks: 4548494

Signature: t1:1748894:1419363496:1f4a62ec8455989c1fcce845695ace1d01c101c8

folly/wangle/futures/Future-inl.h
folly/wangle/futures/Future.cpp [new file with mode: 0644]
folly/wangle/futures/Future.h
folly/wangle/futures/Timekeeper.h [new file with mode: 0644]
folly/wangle/futures/WangleException.h
folly/wangle/futures/detail/Dummy.cpp [deleted file]
folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp [new file with mode: 0644]
folly/wangle/futures/detail/ThreadWheelTimekeeper.h [new file with mode: 0644]
folly/wangle/futures/detail/Types.h [new file with mode: 0644]
folly/wangle/futures/test/TimekeeperTest.cpp [new file with mode: 0644]

index f2816b487e694035d215c654b36c3aef9d93d77b..5b887bf8991447424604d08a6f59be8d2c571795 100644 (file)
 #include <chrono>
 #include <thread>
 
-#include <folly/wangle/futures/detail/Core.h>
 #include <folly/Baton.h>
+#include <folly/wangle/futures/detail/Core.h>
+#include <folly/wangle/futures/Timekeeper.h>
 
 namespace folly { namespace wangle {
 
+class Timekeeper;
+
+namespace detail {
+  Timekeeper* getTimekeeperSingleton();
+}
+
 template <typename T>
 struct isFuture {
   static const bool value = false;
@@ -670,9 +677,9 @@ inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
   return done;
 }
 
-template <typename T, class Duration>
+template <typename T, class Dur>
 Future<T>
-waitWithSemaphore(Future<T>&& f, Duration timeout) {
+waitWithSemaphore(Future<T>&& f, Dur timeout) {
   auto baton = std::make_shared<Baton<>>();
   auto done = f.then([baton](Try<T> &&t) {
     baton->post();
@@ -682,9 +689,9 @@ waitWithSemaphore(Future<T>&& f, Duration timeout) {
   return done;
 }
 
-template <class Duration>
+template <class Dur>
 Future<void>
-waitWithSemaphore(Future<void>&& f, Duration timeout) {
+waitWithSemaphore(Future<void>&& f, Dur timeout) {
   auto baton = std::make_shared<Baton<>>();
   auto done = f.then([baton](Try<void> &&t) {
     baton->post();
@@ -694,6 +701,93 @@ waitWithSemaphore(Future<void>&& f, Duration timeout) {
   return done;
 }
 
+namespace {
+  template <class T>
+  void getWaitHelper(Future<T>* f) {
+    // If we already have a value do the cheap thing
+    if (f->isReady()) {
+      return;
+    }
+
+    folly::Baton<> baton;
+    f->then([&](Try<T> const&) {
+      baton.post();
+    });
+    baton.wait();
+  }
+
+  template <class T>
+  Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
+    // TODO make and use variadic whenAny #5877971
+    Promise<T> p;
+    auto token = std::make_shared<std::atomic<bool>>();
+    folly::Baton<> baton;
+
+    folly::wangle::detail::getTimekeeperSingleton()->after(dur)
+      .then([&,token](Try<void> const& t) {
+        try {
+          t.value();
+          if (token->exchange(true) == false) {
+            p.setException(TimedOut());
+            baton.post();
+          }
+        } catch (std::exception const& e) {
+          if (token->exchange(true) == false) {
+            p.setException(std::current_exception());
+            baton.post();
+          }
+        }
+      });
+
+    f->then([&, token](Try<T>&& t) {
+      if (token->exchange(true) == false) {
+        p.fulfilTry(std::move(t));
+        baton.post();
+      }
+    });
+
+    baton.wait();
+    return p.getFuture();
+  }
+}
+
+template <class T>
+T Future<T>::get() {
+  getWaitHelper(this);
+
+  // Big assumption here: the then() call above, since it doesn't move out
+  // the value, leaves us with a value to return here. This would be a big
+  // no-no in user code, but I'm invoking internal developer privilege. This
+  // is slightly more efficient (save a move()) especially if there's an
+  // exception (save a throw).
+  return std::move(value());
+}
+
+template <>
+inline void Future<void>::get() {
+  getWaitHelper(this);
+}
+
+template <class T>
+T Future<T>::get(Duration dur) {
+  return std::move(getWaitTimeoutHelper(this, dur).value());
+}
+
+template <>
+inline void Future<void>::get(Duration dur) {
+  getWaitTimeoutHelper(this, dur).value();
+}
+
+template <class T>
+Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk)
+{
+  return whenAll(*this, futures::sleep(dur, tk))
+    .then([](Try<std::tuple<Try<T>, Try<void>>>&& tup) {
+      Try<T>& t = std::get<0>(tup.value());
+      return makeFuture<T>(std::move(t));
+    });
+}
+
 }}
 
 // I haven't included a Future<T&> specialization because I don't forsee us
diff --git a/folly/wangle/futures/Future.cpp b/folly/wangle/futures/Future.cpp
new file mode 100644 (file)
index 0000000..41bff82
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/futures/Future.h>
+#include <folly/wangle/futures/detail/ThreadWheelTimekeeper.h>
+#include <folly/Likely.h>
+
+namespace folly { namespace wangle { namespace futures {
+
+Future<void> sleep(Duration dur, Timekeeper* tk) {
+  if (LIKELY(!tk)) {
+    tk = detail::getTimekeeperSingleton();
+  }
+  return tk->after(dur);
+}
+
+}}}
index 15349e395158d2d7d0b2455236f671bb2e783c0c..a51c2949aa6ceb108d2b9aa1d25db7ea8b012782 100644 (file)
 #include <folly/MoveWrapper.h>
 #include <folly/wangle/futures/Promise.h>
 #include <folly/wangle/futures/Try.h>
+#include <folly/wangle/futures/WangleException.h>
+#include <folly/wangle/futures/detail/Types.h>
 
 namespace folly { namespace wangle {
 
+template <class> struct Promise;
+
 namespace detail {
 
 template <class> struct Core;
@@ -85,12 +89,34 @@ struct Extract<R(Class::*)(Args...)> {
   typedef typename ArgType<Args...>::FirstArg FirstArg;
 };
 
+
 } // detail
 
-template <class> struct Promise;
+struct Timekeeper;
 
 template <typename T> struct isFuture;
 
+/// This namespace is for utility functions that would usually be static
+/// members of Future, except they don't make sense there because they don't
+/// depend on the template type (rather, on the type of their arguments in
+/// some cases). This is the least-bad naming scheme we could think of. Some
+/// of the functions herein have really-likely-to-collide names, like "map"
+/// and "sleep".
+namespace futures {
+  /// Returns a Future that will complete after the specified duration. The
+  /// Duration typedef of a `std::chrono` duration type indicates the
+  /// resolution you can expect to be meaningful (milliseconds at the time of
+  /// writing). Normally you wouldn't need to specify a Timekeeper, we will
+  /// use the global wangle timekeeper (we run a thread whose job it is to
+  /// keep time for wangle timeouts) but we provide the option for power
+  /// users.
+  ///
+  /// The Timekeeper thread will be lazily created the first time it is
+  /// needed. If your program never uses any timeouts or other time-based
+  /// Futures you will pay no Timekeeper thread overhead.
+  Future<void> sleep(Duration, Timekeeper* = nullptr);
+}
+
 template <class T>
 class Future {
  public:
@@ -154,6 +180,15 @@ class Future {
   /** A reference to the Try of the value */
   Try<T>& getTry();
 
+  /// Block until the future is fulfilled. Returns the value (moved out), or
+  /// throws the exception. The future must not already have a callback.
+  T get();
+
+  /// Block until the future is fulfilled, or until timed out. Returns the
+  /// value (moved out), or throws the exception (which might be a TimedOut
+  /// exception).
+  T get(Duration dur);
+
   /** When this Future has completed, execute func which is a function that
     takes a Try<T>&&. A Future for the return type of func is
     returned. e.g.
@@ -427,6 +462,10 @@ class Future {
     raise(FutureCancellation());
   }
 
+  /// Delay the completion of this Future for at least this duration from
+  /// now. The optional Timekeeper is as with futures::sleep().
+  Future<T> delayed(Duration, Timekeeper* = nullptr);
+
  private:
   typedef detail::Core<T>* corePtr;
 
@@ -568,8 +607,8 @@ Future<T> waitWithSemaphore(Future<T>&& f);
  *
  * Note: each call to this starts a (short-lived) thread and allocates memory.
  */
-template <typename T, class Duration>
-Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
+template <typename T, class Dur>
+Future<T> waitWithSemaphore(Future<T>&& f, Dur timeout);
 
 }} // folly::wangle
 
diff --git a/folly/wangle/futures/Timekeeper.h b/folly/wangle/futures/Timekeeper.h
new file mode 100644 (file)
index 0000000..f765fab
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/futures/detail/Types.h>
+
+namespace folly { namespace wangle {
+
+template <class> struct Future;
+
+/// A Timekeeper handles the details of keeping time and fulfilling delay
+/// promises. The returned Future<void> will either complete after the
+/// elapsed time, or in the event of some kind of exceptional error may hold
+/// an exception. These Futures respond to cancellation. If you use a lot of
+/// Delays and many of them ultimately are unneeded (as would be the case for
+/// Delays that are used to trigger timeouts of async operations), then you
+/// can and should cancel them to reclaim resources.
+///
+/// Users will typically get one of these via Future::sleep(Duration) or
+/// use them implicitly behind the scenes by passing a timeout to some Future
+/// operation.
+///
+/// Although we don't formally alias Delay = Future<void>,
+/// that's an appropriate term for it. People will probably also call these
+/// Timeouts, and that's ok I guess, but that term is so overloaded I thought
+/// it made sense to introduce a cleaner term.
+///
+/// Remember that Duration is a std::chrono duration (millisecond resolution
+/// at the time of writing).
+class Timekeeper {
+ public:
+  virtual ~Timekeeper() = default;
+
+  /// Returns a future that will complete after the given duration with the
+  /// elapsed time. Exceptional errors can happen but they must be
+  /// exceptional. Use the steady (monotonic) clock.
+  ///
+  /// You may cancel this Future to reclaim resources.
+  ///
+  /// This future probably completes on the timer thread. You should almost
+  /// certainly follow it with a via() call or the accuracy of other timers
+  /// will suffer.
+  virtual Future<void> after(Duration) = 0;
+
+  /// Returns a future that will complete at the requested time.
+  ///
+  /// You may cancel this Future to reclaim resources.
+  ///
+  /// NB This is sugar for `after(when - now)`, so while you are welcome to
+  /// use a std::chrono::system_clock::time_point it will not track changes to
+  /// the system clock but rather execute that many milliseconds in the future
+  /// according to the steady clock.
+  template <class Clock>
+  Future<void> at(std::chrono::time_point<Clock> when);
+};
+
+}}
+
+// now get those definitions
+#include <folly/wangle/futures/Future.h>
+
+// finally we can use Future
+namespace folly { namespace wangle {
+
+  template <class Clock>
+  Future<void> Timekeeper::at(std::chrono::time_point<Clock> when) {
+    auto now = Clock::now();
+
+    if (when <= now) {
+      return makeFuture();
+    }
+
+    return after(when - now);
+  }
+
+}}
index bddf86c75157e02dd98ac43635c6e85fef950583..0d971303256e02fca6b93c72f4b34a9747eeb2d0 100644 (file)
@@ -86,4 +86,9 @@ class FutureCancellation : public WangleException {
   FutureCancellation() : WangleException("Future was cancelled") {}
 };
 
+class TimedOut : public WangleException {
+ public:
+  TimedOut() : WangleException("Timed out") {}
+};
+
 }}
diff --git a/folly/wangle/futures/detail/Dummy.cpp b/folly/wangle/futures/detail/Dummy.cpp
deleted file mode 100644 (file)
index 02a58d4..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// fbbuild is too dumb to know that .h files in the directory affect
-// our project, unless we have a .cpp file in the target, in the same
-// directory.
diff --git a/folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp b/folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp
new file mode 100644 (file)
index 0000000..d3c6315
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ThreadWheelTimekeeper.h"
+
+#include <folly/experimental/Singleton.h>
+#include <folly/wangle/futures/Future.h>
+#include <future>
+
+namespace folly { namespace wangle { namespace detail {
+
+namespace {
+  Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
+
+  // Our Callback object for HHWheelTimer
+  struct WTCallback : public folly::HHWheelTimer::Callback {
+    // Only allow creation by this factory, to ensure heap allocation.
+    static WTCallback* create() {
+      // optimization opportunity: memory pool
+      return new WTCallback();
+    }
+
+    Future<void> getFuture() {
+      return promise_.getFuture();
+    }
+
+   protected:
+    Promise<void> promise_;
+
+    explicit WTCallback() {
+      promise_.setInterruptHandler(
+        std::bind(&WTCallback::interruptHandler, this));
+    }
+
+    void timeoutExpired() noexcept override {
+      promise_.setValue();
+      delete this;
+    }
+
+    void interruptHandler() {
+      cancelTimeout();
+      delete this;
+    }
+  };
+
+} // namespace
+
+
+ThreadWheelTimekeeper::ThreadWheelTimekeeper() :
+  thread_([this]{ eventBase_.loopForever(); }),
+  wheelTimer_(new HHWheelTimer(&eventBase_, std::chrono::milliseconds(1)))
+{
+  eventBase_.waitUntilRunning();
+  eventBase_.runInEventBaseThread([this]{
+    // 15 characters max
+    eventBase_.setName("FutureTimekeepr");
+  });
+}
+
+ThreadWheelTimekeeper::~ThreadWheelTimekeeper() {
+  eventBase_.runInEventBaseThread([this]{
+    wheelTimer_->cancelAll();
+  });
+  eventBase_.terminateLoopSoon();
+  thread_.join();
+}
+
+Future<void> ThreadWheelTimekeeper::after(Duration dur) {
+  auto cob = WTCallback::create();
+  auto f = cob->getFuture();
+  eventBase_.runInEventBaseThread([=]{
+    wheelTimer_->scheduleTimeout(cob, dur);
+  });
+  return f;
+}
+
+Timekeeper* getTimekeeperSingleton() {
+  return timekeeperSingleton_.get_fast();
+}
+
+}}}
diff --git a/folly/wangle/futures/detail/ThreadWheelTimekeeper.h b/folly/wangle/futures/detail/ThreadWheelTimekeeper.h
new file mode 100644 (file)
index 0000000..35e895d
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/futures/Future.h>
+#include <folly/wangle/futures/Timekeeper.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <thread>
+
+namespace folly { namespace wangle { namespace detail {
+
+/// The default Timekeeper implementation which uses a HHWheelTimer on an
+/// EventBase in a dedicated thread. Users needn't deal with this directly, it
+/// is used by default by Future methods that work with timeouts.
+class ThreadWheelTimekeeper : public Timekeeper {
+ public:
+  /// But it doesn't *have* to be a singleton.
+  ThreadWheelTimekeeper();
+  ~ThreadWheelTimekeeper();
+
+  /// Implement the Timekeeper interface
+  /// This future *does* complete on the timer thread. You should almost
+  /// certainly follow it with a via() call or the accuracy of other timers
+  /// will suffer.
+  Future<void> after(Duration) override;
+
+ protected:
+  folly::EventBase eventBase_;
+  std::thread thread_;
+  HHWheelTimer::UniquePtr wheelTimer_;
+};
+
+Timekeeper* getTimekeeperSingleton();
+
+}}}
diff --git a/folly/wangle/futures/detail/Types.h b/folly/wangle/futures/detail/Types.h
new file mode 100644 (file)
index 0000000..57b93e0
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+namespace folly { namespace wangle {
+
+using Duration = std::chrono::milliseconds;
+
+}}
diff --git a/folly/wangle/futures/test/TimekeeperTest.cpp b/folly/wangle/futures/test/TimekeeperTest.cpp
new file mode 100644 (file)
index 0000000..5d2a359
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/wangle/futures/Timekeeper.h>
+#include <unistd.h>
+
+using namespace folly::wangle;
+using namespace std::chrono;
+using folly::wangle::Timekeeper;
+using Duration = folly::wangle::Duration;
+
+std::chrono::milliseconds const one_ms(1);
+std::chrono::milliseconds const awhile(10);
+
+std::chrono::steady_clock::time_point now() {
+  return std::chrono::steady_clock::now();
+}
+
+struct TimekeeperFixture : public testing::Test {
+  TimekeeperFixture() :
+    timeLord_(folly::wangle::detail::getTimekeeperSingleton())
+  {}
+
+  Timekeeper* timeLord_;
+};
+
+TEST_F(TimekeeperFixture, after) {
+  Duration waited(0);
+
+  auto t1 = now();
+  auto f = timeLord_->after(awhile);
+  EXPECT_FALSE(f.isReady());
+  f.get();
+  auto t2 = now();
+
+  EXPECT_GE(t2 - t1, awhile);
+}
+
+TEST(Timekeeper, futureGet) {
+  Promise<int> p;
+  std::thread([&]{ p.setValue(42); }).detach();
+  EXPECT_EQ(42, p.getFuture().get());
+}
+
+TEST(Timekeeper, futureGetBeforeTimeout) {
+  Promise<int> p;
+  std::thread([&]{ p.setValue(42); }).detach();
+  // Technically this is a race and if the test server is REALLY overloaded
+  // and it takes more than a second to do that thread it could be flaky. But
+  // I want a low timeout (in human terms) so if this regresses and someone
+  // runs it by hand they're not sitting there forever wondering why it's
+  // blocked, and get a useful error message instead. If it does get flaky,
+  // empirically increase the timeout to the point where it's very improbable.
+  EXPECT_EQ(42, p.getFuture().get(seconds(2)));
+}
+
+TEST(Timekeeper, futureGetTimeout) {
+  Promise<int> p;
+  EXPECT_THROW(p.getFuture().get(Duration(1)), folly::wangle::TimedOut);
+}
+
+TEST(Timekeeper, futureSleep) {
+  auto t1 = now();
+  futures::sleep(one_ms).get();
+  EXPECT_GE(now() - t1, one_ms);
+}
+
+TEST(Timekeeper, futureDelayed) {
+  auto t1 = now();
+  auto dur = makeFuture()
+    .delayed(one_ms)
+    .then([=]{ return now() - t1; })
+    .get();
+
+  EXPECT_GE(dur, one_ms);
+}