From d1be08e3f87525f513c2d7a61c9c770afaef95bc Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Fri, 26 Dec 2014 15:18:11 -0800 Subject: [PATCH] (wangle) Timeouts basic Summary: Add basic timeout functionality. This adds `futures::sleep` which makes an async Future that finishes after the given duration, and `Future::get` which blocks on the result and takes an optional timeout. Introducing the folly::wangle::futures namespace (soon to be just folly::futures) which will hold our wangle utility functions, the things that live in the Future object in Twitter's scala code. We'll probably move when* and wait-ish methods in here too, and perhaps alias makeFuture-ish methods too, though James has me mostly convinced not to deprecate them at the folly::wangle level (because they're basically Future constructors and Future lives at folly::wangle) `Future::delayed` after Twitter's helper of the same name Test Plan: new and old unit tests Reviewed By: davejwatson@fb.com Subscribers: trunkagent, fugalh, exa, folly-diffs@ FB internal diff: D1748894 Tasks: 4548494 Signature: t1:1748894:1419363496:1f4a62ec8455989c1fcce845695ace1d01c101c8 --- folly/wangle/futures/Future-inl.h | 104 +++++++++++++++++- folly/wangle/futures/Future.cpp | 29 +++++ folly/wangle/futures/Future.h | 45 +++++++- folly/wangle/futures/Timekeeper.h | 90 +++++++++++++++ folly/wangle/futures/WangleException.h | 5 + .../futures/detail/ThreadWheelTimekeeper.cpp | 93 ++++++++++++++++ .../futures/detail/ThreadWheelTimekeeper.h | 50 +++++++++ .../futures/detail/{Dummy.cpp => Types.h} | 12 +- folly/wangle/futures/test/TimekeeperTest.cpp | 90 +++++++++++++++ 9 files changed, 507 insertions(+), 11 deletions(-) create mode 100644 folly/wangle/futures/Future.cpp create mode 100644 folly/wangle/futures/Timekeeper.h create mode 100644 folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp create mode 100644 folly/wangle/futures/detail/ThreadWheelTimekeeper.h rename folly/wangle/futures/detail/{Dummy.cpp => Types.h} (79%) create mode 100644 folly/wangle/futures/test/TimekeeperTest.cpp diff --git a/folly/wangle/futures/Future-inl.h b/folly/wangle/futures/Future-inl.h index f2816b48..5b887bf8 100644 --- a/folly/wangle/futures/Future-inl.h +++ b/folly/wangle/futures/Future-inl.h @@ -19,11 +19,18 @@ #include #include -#include #include +#include +#include namespace folly { namespace wangle { +class Timekeeper; + +namespace detail { + Timekeeper* getTimekeeperSingleton(); +} + template struct isFuture { static const bool value = false; @@ -670,9 +677,9 @@ inline Future waitWithSemaphore(Future&& f) { return done; } -template +template Future -waitWithSemaphore(Future&& f, Duration timeout) { +waitWithSemaphore(Future&& f, Dur timeout) { auto baton = std::make_shared>(); auto done = f.then([baton](Try &&t) { baton->post(); @@ -682,9 +689,9 @@ waitWithSemaphore(Future&& f, Duration timeout) { return done; } -template +template Future -waitWithSemaphore(Future&& f, Duration timeout) { +waitWithSemaphore(Future&& f, Dur timeout) { auto baton = std::make_shared>(); auto done = f.then([baton](Try &&t) { baton->post(); @@ -694,6 +701,93 @@ waitWithSemaphore(Future&& f, Duration timeout) { return done; } +namespace { + template + void getWaitHelper(Future* f) { + // If we already have a value do the cheap thing + if (f->isReady()) { + return; + } + + folly::Baton<> baton; + f->then([&](Try const&) { + baton.post(); + }); + baton.wait(); + } + + template + Future getWaitTimeoutHelper(Future* f, Duration dur) { + // TODO make and use variadic whenAny #5877971 + Promise p; + auto token = std::make_shared>(); + folly::Baton<> baton; + + folly::wangle::detail::getTimekeeperSingleton()->after(dur) + .then([&,token](Try const& t) { + try { + t.value(); + if (token->exchange(true) == false) { + p.setException(TimedOut()); + baton.post(); + } + } catch (std::exception const& e) { + if (token->exchange(true) == false) { + p.setException(std::current_exception()); + baton.post(); + } + } + }); + + f->then([&, token](Try&& t) { + if (token->exchange(true) == false) { + p.fulfilTry(std::move(t)); + baton.post(); + } + }); + + baton.wait(); + return p.getFuture(); + } +} + +template +T Future::get() { + getWaitHelper(this); + + // Big assumption here: the then() call above, since it doesn't move out + // the value, leaves us with a value to return here. This would be a big + // no-no in user code, but I'm invoking internal developer privilege. This + // is slightly more efficient (save a move()) especially if there's an + // exception (save a throw). + return std::move(value()); +} + +template <> +inline void Future::get() { + getWaitHelper(this); +} + +template +T Future::get(Duration dur) { + return std::move(getWaitTimeoutHelper(this, dur).value()); +} + +template <> +inline void Future::get(Duration dur) { + getWaitTimeoutHelper(this, dur).value(); +} + +template +Future Future::delayed(Duration dur, Timekeeper* tk) +{ + return whenAll(*this, futures::sleep(dur, tk)) + .then([](Try, Try>>&& tup) { + Try& t = std::get<0>(tup.value()); + return makeFuture(std::move(t)); + }); +} + }} // I haven't included a Future specialization because I don't forsee us diff --git a/folly/wangle/futures/Future.cpp b/folly/wangle/futures/Future.cpp new file mode 100644 index 00000000..41bff823 --- /dev/null +++ b/folly/wangle/futures/Future.cpp @@ -0,0 +1,29 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +namespace folly { namespace wangle { namespace futures { + +Future sleep(Duration dur, Timekeeper* tk) { + if (LIKELY(!tk)) { + tk = detail::getTimekeeperSingleton(); + } + return tk->after(dur); +} + +}}} diff --git a/folly/wangle/futures/Future.h b/folly/wangle/futures/Future.h index 15349e39..a51c2949 100644 --- a/folly/wangle/futures/Future.h +++ b/folly/wangle/futures/Future.h @@ -26,9 +26,13 @@ #include #include #include +#include +#include namespace folly { namespace wangle { +template struct Promise; + namespace detail { template struct Core; @@ -85,12 +89,34 @@ struct Extract { typedef typename ArgType::FirstArg FirstArg; }; + } // detail -template struct Promise; +struct Timekeeper; template struct isFuture; +/// This namespace is for utility functions that would usually be static +/// members of Future, except they don't make sense there because they don't +/// depend on the template type (rather, on the type of their arguments in +/// some cases). This is the least-bad naming scheme we could think of. Some +/// of the functions herein have really-likely-to-collide names, like "map" +/// and "sleep". +namespace futures { + /// Returns a Future that will complete after the specified duration. The + /// Duration typedef of a `std::chrono` duration type indicates the + /// resolution you can expect to be meaningful (milliseconds at the time of + /// writing). Normally you wouldn't need to specify a Timekeeper, we will + /// use the global wangle timekeeper (we run a thread whose job it is to + /// keep time for wangle timeouts) but we provide the option for power + /// users. + /// + /// The Timekeeper thread will be lazily created the first time it is + /// needed. If your program never uses any timeouts or other time-based + /// Futures you will pay no Timekeeper thread overhead. + Future sleep(Duration, Timekeeper* = nullptr); +} + template class Future { public: @@ -154,6 +180,15 @@ class Future { /** A reference to the Try of the value */ Try& getTry(); + /// Block until the future is fulfilled. Returns the value (moved out), or + /// throws the exception. The future must not already have a callback. + T get(); + + /// Block until the future is fulfilled, or until timed out. Returns the + /// value (moved out), or throws the exception (which might be a TimedOut + /// exception). + T get(Duration dur); + /** When this Future has completed, execute func which is a function that takes a Try&&. A Future for the return type of func is returned. e.g. @@ -427,6 +462,10 @@ class Future { raise(FutureCancellation()); } + /// Delay the completion of this Future for at least this duration from + /// now. The optional Timekeeper is as with futures::sleep(). + Future delayed(Duration, Timekeeper* = nullptr); + private: typedef detail::Core* corePtr; @@ -568,8 +607,8 @@ Future waitWithSemaphore(Future&& f); * * Note: each call to this starts a (short-lived) thread and allocates memory. */ -template -Future waitWithSemaphore(Future&& f, Duration timeout); +template +Future waitWithSemaphore(Future&& f, Dur timeout); }} // folly::wangle diff --git a/folly/wangle/futures/Timekeeper.h b/folly/wangle/futures/Timekeeper.h new file mode 100644 index 00000000..f765fab6 --- /dev/null +++ b/folly/wangle/futures/Timekeeper.h @@ -0,0 +1,90 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { namespace wangle { + +template struct Future; + +/// A Timekeeper handles the details of keeping time and fulfilling delay +/// promises. The returned Future will either complete after the +/// elapsed time, or in the event of some kind of exceptional error may hold +/// an exception. These Futures respond to cancellation. If you use a lot of +/// Delays and many of them ultimately are unneeded (as would be the case for +/// Delays that are used to trigger timeouts of async operations), then you +/// can and should cancel them to reclaim resources. +/// +/// Users will typically get one of these via Future::sleep(Duration) or +/// use them implicitly behind the scenes by passing a timeout to some Future +/// operation. +/// +/// Although we don't formally alias Delay = Future, +/// that's an appropriate term for it. People will probably also call these +/// Timeouts, and that's ok I guess, but that term is so overloaded I thought +/// it made sense to introduce a cleaner term. +/// +/// Remember that Duration is a std::chrono duration (millisecond resolution +/// at the time of writing). +class Timekeeper { + public: + virtual ~Timekeeper() = default; + + /// Returns a future that will complete after the given duration with the + /// elapsed time. Exceptional errors can happen but they must be + /// exceptional. Use the steady (monotonic) clock. + /// + /// You may cancel this Future to reclaim resources. + /// + /// This future probably completes on the timer thread. You should almost + /// certainly follow it with a via() call or the accuracy of other timers + /// will suffer. + virtual Future after(Duration) = 0; + + /// Returns a future that will complete at the requested time. + /// + /// You may cancel this Future to reclaim resources. + /// + /// NB This is sugar for `after(when - now)`, so while you are welcome to + /// use a std::chrono::system_clock::time_point it will not track changes to + /// the system clock but rather execute that many milliseconds in the future + /// according to the steady clock. + template + Future at(std::chrono::time_point when); +}; + +}} + +// now get those definitions +#include + +// finally we can use Future +namespace folly { namespace wangle { + + template + Future Timekeeper::at(std::chrono::time_point when) { + auto now = Clock::now(); + + if (when <= now) { + return makeFuture(); + } + + return after(when - now); + } + +}} diff --git a/folly/wangle/futures/WangleException.h b/folly/wangle/futures/WangleException.h index bddf86c7..0d971303 100644 --- a/folly/wangle/futures/WangleException.h +++ b/folly/wangle/futures/WangleException.h @@ -86,4 +86,9 @@ class FutureCancellation : public WangleException { FutureCancellation() : WangleException("Future was cancelled") {} }; +class TimedOut : public WangleException { + public: + TimedOut() : WangleException("Timed out") {} +}; + }} diff --git a/folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp b/folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp new file mode 100644 index 00000000..d3c63155 --- /dev/null +++ b/folly/wangle/futures/detail/ThreadWheelTimekeeper.cpp @@ -0,0 +1,93 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ThreadWheelTimekeeper.h" + +#include +#include +#include + +namespace folly { namespace wangle { namespace detail { + +namespace { + Singleton timekeeperSingleton_; + + // Our Callback object for HHWheelTimer + struct WTCallback : public folly::HHWheelTimer::Callback { + // Only allow creation by this factory, to ensure heap allocation. + static WTCallback* create() { + // optimization opportunity: memory pool + return new WTCallback(); + } + + Future getFuture() { + return promise_.getFuture(); + } + + protected: + Promise promise_; + + explicit WTCallback() { + promise_.setInterruptHandler( + std::bind(&WTCallback::interruptHandler, this)); + } + + void timeoutExpired() noexcept override { + promise_.setValue(); + delete this; + } + + void interruptHandler() { + cancelTimeout(); + delete this; + } + }; + +} // namespace + + +ThreadWheelTimekeeper::ThreadWheelTimekeeper() : + thread_([this]{ eventBase_.loopForever(); }), + wheelTimer_(new HHWheelTimer(&eventBase_, std::chrono::milliseconds(1))) +{ + eventBase_.waitUntilRunning(); + eventBase_.runInEventBaseThread([this]{ + // 15 characters max + eventBase_.setName("FutureTimekeepr"); + }); +} + +ThreadWheelTimekeeper::~ThreadWheelTimekeeper() { + eventBase_.runInEventBaseThread([this]{ + wheelTimer_->cancelAll(); + }); + eventBase_.terminateLoopSoon(); + thread_.join(); +} + +Future ThreadWheelTimekeeper::after(Duration dur) { + auto cob = WTCallback::create(); + auto f = cob->getFuture(); + eventBase_.runInEventBaseThread([=]{ + wheelTimer_->scheduleTimeout(cob, dur); + }); + return f; +} + +Timekeeper* getTimekeeperSingleton() { + return timekeeperSingleton_.get_fast(); +} + +}}} diff --git a/folly/wangle/futures/detail/ThreadWheelTimekeeper.h b/folly/wangle/futures/detail/ThreadWheelTimekeeper.h new file mode 100644 index 00000000..35e895d8 --- /dev/null +++ b/folly/wangle/futures/detail/ThreadWheelTimekeeper.h @@ -0,0 +1,50 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { namespace detail { + +/// The default Timekeeper implementation which uses a HHWheelTimer on an +/// EventBase in a dedicated thread. Users needn't deal with this directly, it +/// is used by default by Future methods that work with timeouts. +class ThreadWheelTimekeeper : public Timekeeper { + public: + /// But it doesn't *have* to be a singleton. + ThreadWheelTimekeeper(); + ~ThreadWheelTimekeeper(); + + /// Implement the Timekeeper interface + /// This future *does* complete on the timer thread. You should almost + /// certainly follow it with a via() call or the accuracy of other timers + /// will suffer. + Future after(Duration) override; + + protected: + folly::EventBase eventBase_; + std::thread thread_; + HHWheelTimer::UniquePtr wheelTimer_; +}; + +Timekeeper* getTimekeeperSingleton(); + +}}} diff --git a/folly/wangle/futures/detail/Dummy.cpp b/folly/wangle/futures/detail/Types.h similarity index 79% rename from folly/wangle/futures/detail/Dummy.cpp rename to folly/wangle/futures/detail/Types.h index 02a58d4f..57b93e05 100644 --- a/folly/wangle/futures/detail/Dummy.cpp +++ b/folly/wangle/futures/detail/Types.h @@ -14,6 +14,12 @@ * limitations under the License. */ -// fbbuild is too dumb to know that .h files in the directory affect -// our project, unless we have a .cpp file in the target, in the same -// directory. +#pragma once + +#include + +namespace folly { namespace wangle { + +using Duration = std::chrono::milliseconds; + +}} diff --git a/folly/wangle/futures/test/TimekeeperTest.cpp b/folly/wangle/futures/test/TimekeeperTest.cpp new file mode 100644 index 00000000..5d2a359d --- /dev/null +++ b/folly/wangle/futures/test/TimekeeperTest.cpp @@ -0,0 +1,90 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +using namespace folly::wangle; +using namespace std::chrono; +using folly::wangle::Timekeeper; +using Duration = folly::wangle::Duration; + +std::chrono::milliseconds const one_ms(1); +std::chrono::milliseconds const awhile(10); + +std::chrono::steady_clock::time_point now() { + return std::chrono::steady_clock::now(); +} + +struct TimekeeperFixture : public testing::Test { + TimekeeperFixture() : + timeLord_(folly::wangle::detail::getTimekeeperSingleton()) + {} + + Timekeeper* timeLord_; +}; + +TEST_F(TimekeeperFixture, after) { + Duration waited(0); + + auto t1 = now(); + auto f = timeLord_->after(awhile); + EXPECT_FALSE(f.isReady()); + f.get(); + auto t2 = now(); + + EXPECT_GE(t2 - t1, awhile); +} + +TEST(Timekeeper, futureGet) { + Promise p; + std::thread([&]{ p.setValue(42); }).detach(); + EXPECT_EQ(42, p.getFuture().get()); +} + +TEST(Timekeeper, futureGetBeforeTimeout) { + Promise p; + std::thread([&]{ p.setValue(42); }).detach(); + // Technically this is a race and if the test server is REALLY overloaded + // and it takes more than a second to do that thread it could be flaky. But + // I want a low timeout (in human terms) so if this regresses and someone + // runs it by hand they're not sitting there forever wondering why it's + // blocked, and get a useful error message instead. If it does get flaky, + // empirically increase the timeout to the point where it's very improbable. + EXPECT_EQ(42, p.getFuture().get(seconds(2))); +} + +TEST(Timekeeper, futureGetTimeout) { + Promise p; + EXPECT_THROW(p.getFuture().get(Duration(1)), folly::wangle::TimedOut); +} + +TEST(Timekeeper, futureSleep) { + auto t1 = now(); + futures::sleep(one_ms).get(); + EXPECT_GE(now() - t1, one_ms); +} + +TEST(Timekeeper, futureDelayed) { + auto t1 = now(); + auto dur = makeFuture() + .delayed(one_ms) + .then([=]{ return now() - t1; }) + .get(); + + EXPECT_GE(dur, one_ms); +} -- 2.34.1