From 339a9d37df481d7f194ba394a95bd6aed7b39847 Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Wed, 4 Jun 2014 15:38:24 -0700 Subject: [PATCH] (folly) QueuedImmediateExecutor Summary: Add the `QueuedImmediateExecutor` which behaves like `InlineExecutor` but with different (and usually better) ordering semantics for nested calls. @override-unit-failures Test Plan: unit tests Reviewed By: davejwatson@fb.com Subscribers: folly@lists, net-systems@, fugalh, exa FB internal diff: D1364904 Tasks: 3789661 --- folly/wangle/Executor.h | 21 +++++++-- folly/wangle/InlineExecutor.h | 3 ++ folly/wangle/QueuedImmediateExecutor.cpp | 38 +++++++++++++++++ folly/wangle/QueuedImmediateExecutor.h | 33 +++++++++++++++ folly/wangle/test/ExecutorTest.cpp | 54 +++++++++++++++++++++++- 5 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 folly/wangle/QueuedImmediateExecutor.cpp create mode 100644 folly/wangle/QueuedImmediateExecutor.h diff --git a/folly/wangle/Executor.h b/folly/wangle/Executor.h index 8cbd089a..a248c28e 100644 --- a/folly/wangle/Executor.h +++ b/folly/wangle/Executor.h @@ -17,13 +17,16 @@ #pragma once #include -#include #include +#include +#include namespace folly { namespace wangle { - // Like an Rx Scheduler. We should probably rename it to match now that it - // has scheduling semantics too, but that's a codemod for another lazy - // summer afternoon. + /// An Executor accepts units of work with add(), which should be + /// threadsafe. + /// Like an Rx Scheduler. We should probably rename it to match now that it + /// has scheduling semantics too, but that's a codemod for another lazy + /// summer afternoon. class Executor : boost::noncopyable { public: typedef std::function Action; @@ -38,6 +41,16 @@ namespace folly { namespace wangle { /// schedule variants must be threadsafe. virtual void add(Action&&) = 0; + /// A convenience function for shared_ptr to legacy functors. + /// + /// Sometimes you have a functor that is move-only, and therefore can't be + /// converted to a std::function (e.g. std::packaged_task). In that case, + /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this. + template + void addPtr(P fn) { + this->add([fn]() mutable { (*fn)(); }); + } + /// Alias for add() (for Rx consistency) void schedule(Action&& a) { add(std::move(a)); } diff --git a/folly/wangle/InlineExecutor.h b/folly/wangle/InlineExecutor.h index 8c338b15..f86e48bd 100644 --- a/folly/wangle/InlineExecutor.h +++ b/folly/wangle/InlineExecutor.h @@ -19,6 +19,9 @@ namespace folly { namespace wangle { + /// When work is "queued", execute it immediately inline. + /// Usually when you think you want this, you actually want a + /// QueuedImmediateExecutor. class InlineExecutor : public Executor { public: void add(std::function&& f) override { diff --git a/folly/wangle/QueuedImmediateExecutor.cpp b/folly/wangle/QueuedImmediateExecutor.cpp new file mode 100644 index 00000000..2df5bd3a --- /dev/null +++ b/folly/wangle/QueuedImmediateExecutor.cpp @@ -0,0 +1,38 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QueuedImmediateExecutor.h" +#include "folly/ThreadLocal.h" +#include + +namespace folly { namespace wangle { + +void QueuedImmediateExecutor::add(Action&& callback) +{ + thread_local std::queue q; + + if (q.empty()) { + q.push(std::move(callback)); + while (!q.empty()) { + q.front()(); + q.pop(); + } + } else { + q.push(callback); + } +} + +}} // namespace diff --git a/folly/wangle/QueuedImmediateExecutor.h b/folly/wangle/QueuedImmediateExecutor.h new file mode 100644 index 00000000..40e08315 --- /dev/null +++ b/folly/wangle/QueuedImmediateExecutor.h @@ -0,0 +1,33 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "folly/wangle/Executor.h" + +namespace folly { namespace wangle { + +/** + * Runs inline like InlineExecutor, but with a queue so that any tasks added + * to this executor by one of its own callbacks will be queued instead of + * executed inline (nested). This is usually better behavior than Inline. + */ +class QueuedImmediateExecutor : public Executor { + public: + void add(Action&&) override; +}; + +}} // namespace diff --git a/folly/wangle/test/ExecutorTest.cpp b/folly/wangle/test/ExecutorTest.cpp index 5230869f..423b7091 100644 --- a/folly/wangle/test/ExecutorTest.cpp +++ b/folly/wangle/test/ExecutorTest.cpp @@ -15,11 +15,13 @@ */ #include +#include "folly/wangle/InlineExecutor.h" #include "folly/wangle/ManualExecutor.h" +#include "folly/wangle/QueuedImmediateExecutor.h" -using namespace testing; using namespace folly::wangle; using namespace std::chrono; +using namespace testing; TEST(ManualExecutor, runIsStable) { ManualExecutor x; @@ -86,3 +88,53 @@ TEST(ManualExecutor, advanceNeg) { x.advance(microseconds(-1)); EXPECT_EQ(count, 0); } + +TEST(Executor, InlineExecutor) { + InlineExecutor x; + size_t counter = 0; + x.add([&]{ + x.add([&]{ + EXPECT_EQ(counter++, 0); + }); + EXPECT_EQ(counter++, 1); + }); + EXPECT_EQ(counter, 2); +} + +TEST(Executor, QueuedImmediateExecutor) { + QueuedImmediateExecutor x; + size_t counter = 0; + x.add([&]{ + x.add([&]{ + EXPECT_EQ(1, counter++); + }); + EXPECT_EQ(0, counter++); + }); + EXPECT_EQ(2, counter); +} + +TEST(Executor, Runnable) { + InlineExecutor x; + size_t counter = 0; + struct Runnable { + std::function fn; + void operator()() { fn(); } + }; + Runnable f; + f.fn = [&]{ counter++; }; + x.add(f); + EXPECT_EQ(counter, 1); +} + +TEST(Executor, RunnablePtr) { + InlineExecutor x; + struct Runnable { + std::function fn; + void operator()() { fn(); } + }; + size_t counter = 0; + auto fnp = std::make_shared(); + fnp->fn = [&]{ counter++; }; + x.addPtr(fnp); + EXPECT_EQ(counter, 1); +} -- 2.34.1