From 20808864dfde991fd740e6caefe632d4bf244d69 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Fri, 19 Jan 2018 09:18:41 -0800 Subject: [PATCH] Add TimedDrivableExecutor to folly. Summary: Adds a TimedDrivableExecutor implementation of DrivableExecutor that adds a driveUntil method. driveUntil is as drive, except that it takes a timepoint and will stop driving after that time to allow callers to time out more easily. Reviewed By: yfeldblum Differential Revision: D6658320 fbshipit-source-id: a75145748e78497ce107ae152f25729547883835 --- CMakeLists.txt | 1 + folly/Makefile.am | 2 + folly/executors/TimedDrivableExecutor.cpp | 70 ++++++++++++ folly/executors/TimedDrivableExecutor.h | 102 ++++++++++++++++++ .../test/TimedDrivableExecutorTest.cpp | 84 +++++++++++++++ 5 files changed, 259 insertions(+) create mode 100644 folly/executors/TimedDrivableExecutor.cpp create mode 100644 folly/executors/TimedDrivableExecutor.h create mode 100644 folly/executors/test/TimedDrivableExecutorTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6aa82b67..d5816165 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -371,6 +371,7 @@ if (BUILD_TESTS) TEST serial_executor_test SOURCES SerialExecutorTest.cpp TEST thread_pool_executor_test SOURCES ThreadPoolExecutorTest.cpp TEST threaded_executor_test SOURCES ThreadedExecutorTest.cpp + TEST timed_drivable_executor_test SOURCES TimedDrivableExecutorTest.cpp DIRECTORY executors/task_queue/test/ TEST unbounded_blocking_queue_test SOURCES UnboundedBlockingQueueTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 0dd44b1c..66bb93e7 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -103,6 +103,7 @@ nobase_follyinclude_HEADERS = \ executors/SerialExecutor.h \ executors/ThreadPoolExecutor.h \ executors/ThreadedExecutor.h \ + executors/TimedDrivableExecutor.h \ executors/task_queue/BlockingQueue.h \ executors/task_queue/LifoSemMPMCQueue.h \ executors/task_queue/PriorityLifoSemMPMCQueue.h \ @@ -541,6 +542,7 @@ libfolly_la_SOURCES = \ executors/SerialExecutor.cpp \ executors/ThreadPoolExecutor.cpp \ executors/ThreadedExecutor.cpp \ + executors/TimedDrivableExecutor.cpp \ executors/QueuedImmediateExecutor.cpp \ experimental/hazptr/hazptr.cpp \ experimental/hazptr/memory_resource.cpp \ diff --git a/folly/executors/TimedDrivableExecutor.cpp b/folly/executors/TimedDrivableExecutor.cpp new file mode 100644 index 00000000..4f1e21d7 --- /dev/null +++ b/folly/executors/TimedDrivableExecutor.cpp @@ -0,0 +1,70 @@ +/* + * Copyright 2018-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +namespace folly { + +void TimedDrivableExecutor::add(Func callback) { + queue_.enqueue(std::move(callback)); +} + +void TimedDrivableExecutor::drive() { + wait(); + run(); +} + +size_t TimedDrivableExecutor::run() { + size_t count = 0; + size_t n = queue_.size(); + + // If we have waited already, then func_ may have a value + if (func_) { + auto f = std::move(func_); + f(); + count = 1; + } + + while (count < n && queue_.try_dequeue(func_)) { + auto f = std::move(func_); + f(); + ++count; + } + + return count; +} + +size_t TimedDrivableExecutor::drain() { + size_t tasksRun = 0; + size_t tasksForSingleRun = 0; + while ((tasksForSingleRun = run()) != 0) { + tasksRun += tasksForSingleRun; + } + return tasksRun; +} + +void TimedDrivableExecutor::wait() { + if (!func_) { + queue_.dequeue(func_); + } +} + +} // namespace folly diff --git a/folly/executors/TimedDrivableExecutor.h b/folly/executors/TimedDrivableExecutor.h new file mode 100644 index 00000000..f6e741bc --- /dev/null +++ b/folly/executors/TimedDrivableExecutor.h @@ -0,0 +1,102 @@ +/* + * Copyright 2018-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace folly { + +/* + * A DrivableExecutor can be driven via its drive() method or its driveUntil() + * that drives until some time point. + */ +class TimedDrivableExecutor : public DrivableExecutor { + public: + /// Implements DrivableExecutor + void drive() override; + + // Make progress if there is work to do and return true. Otherwise return + // false. + bool try_drive() { + return try_wait() && run() > 0; + } + + // Make progress on this Executor's work. Acts as drive, except it will only + // wait for a period of timeout for work to be enqueued. If no work is + // enqueued by that point, it will return. + template + bool try_drive_for(const std::chrono::duration& timeout) { + return try_wait_for(timeout) && run() > 0; + } + + // Make progress on this Executor's work. Acts as drive, except it will only + // wait until deadline for work to be enqueued. If no work is enqueued by + // that point, it will return. + template + bool try_drive_until( + const std::chrono::time_point& deadline) { + return try_wait_until(deadline) && run() > 0; + } + + void add(Func) override; + + /// Do work. Returns the number of functions that were executed (maybe 0). + /// Non-blocking, in the sense that we don't wait for work (we can't + /// control whether one of the functions blocks). + /// This is stable, it will not chase an ever-increasing tail of work. + /// This also means, there may be more work available to perform at the + /// moment that this returns. + size_t run(); + + // Do work until there is no more work to do. + // Returns the number of functions that were executed (maybe 0). + // Unlike run, this method is not stable. It will chase an infinite tail of + // work so should be used with care. + // There will be no work available to perform at the moment that this + // returns. + size_t drain(); + + /// Wait for work to do. + void wait(); + + // Return true if there is work to do, false otherwise + bool try_wait() { + return func_ || queue_.try_dequeue(func_); + } + + /// Wait for work to do or for a period of timeout, whichever is sooner. + template + bool try_wait_for(const std::chrono::duration& timeout) { + return func_ || queue_.try_dequeue_for(func_, timeout); + } + + /// Wait for work to do or until deadline passes, whichever is sooner. + template + bool try_wait_until( + const std::chrono::time_point& deadline) { + return func_ || queue_.try_dequeue_until(func_, deadline); + } + + private: + UMPSCQueue queue_; + Func func_; +}; + +} // namespace folly diff --git a/folly/executors/test/TimedDrivableExecutorTest.cpp b/folly/executors/test/TimedDrivableExecutorTest.cpp new file mode 100644 index 00000000..fa3469e2 --- /dev/null +++ b/folly/executors/test/TimedDrivableExecutorTest.cpp @@ -0,0 +1,84 @@ +/* + * Copyright 2018-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +using namespace folly; + +TEST(TimedDrivableExecutor, runIsStable) { + TimedDrivableExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + auto f2 = [&]() { + x.add(f1); + x.add(f1); + }; + x.add(f2); + x.run(); + EXPECT_EQ(count, 0); +} + +TEST(TimedDrivableExecutor, drainIsNotStable) { + TimedDrivableExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + auto f2 = [&]() { + x.add(f1); + x.add(f1); + }; + x.add(f2); + x.drain(); + EXPECT_EQ(count, 2); +} + +TEST(TimedDrivableExecutor, try_drive) { + TimedDrivableExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + x.try_drive(); + EXPECT_EQ(count, 0); + x.add(f1); + x.try_drive(); + EXPECT_EQ(count, 1); +} + +TEST(TimedDrivableExecutor, try_drive_for) { + TimedDrivableExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + x.try_drive_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); + x.add(f1); + x.try_drive_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 1); +} + +TEST(TimedDrivableExecutor, try_drive_until) { + TimedDrivableExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + x.try_drive_until( + std::chrono::system_clock::now() + std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); + x.add(f1); + x.try_drive_until( + std::chrono::system_clock::now() + std::chrono::milliseconds(100)); + EXPECT_EQ(count, 1); +} -- 2.34.1