From c3637eb86ac4fa24fbee20b39373011b50a05404 Mon Sep 17 00:00:00 2001 From: Sarang Masti Date: Mon, 9 Dec 2013 15:08:02 -0800 Subject: [PATCH] Add futexTimedWait Summary: Add futexTimedWait to Futex which allows callers to wait on the futex for a specified max duration. Test Plan: -- Ran all unitests Reviewed By: ngbronson@fb.com FB internal diff: D1090115 --- folly/detail/Futex.cpp | 40 +++++++++ folly/detail/Futex.h | 122 +++++++++++++++++++++++--- folly/test/DeterministicSchedule.cpp | 51 +++++++++++ folly/test/DeterministicSchedule.h | 18 ++++ folly/test/FutexTest.cpp | 123 +++++++++++++++++++++++++++ 5 files changed, 344 insertions(+), 10 deletions(-) create mode 100644 folly/detail/Futex.cpp diff --git a/folly/detail/Futex.cpp b/folly/detail/Futex.cpp new file mode 100644 index 00000000..206df408 --- /dev/null +++ b/folly/detail/Futex.cpp @@ -0,0 +1,40 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { namespace detail { + +/* see Futex.h */ +FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) { + if (returnVal == 0) { + return FutexResult::AWOKEN; + } + switch(futexErrno) { + case ETIMEDOUT: + return FutexResult::TIMEDOUT; + case EINTR: + return FutexResult::INTERRUPTED; + case EWOULDBLOCK: + return FutexResult::VALUE_CHANGED; + default: + assert(false); + /* Shouldn't reach here. Just return one of the FutexResults */ + return FutexResult::VALUE_CHANGED; + } +} + +}} diff --git a/folly/detail/Futex.h b/folly/detail/Futex.h index 93544bc2..50eba88a 100644 --- a/folly/detail/Futex.h +++ b/folly/detail/Futex.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -25,8 +26,22 @@ #include #include +using std::chrono::steady_clock; +using std::chrono::system_clock; +using std::chrono::time_point; + namespace folly { namespace detail { +enum class FutexResult { + VALUE_CHANGED, /* Futex value didn't match expected */ + AWOKEN, /* futex wait matched with a futex wake */ + INTERRUPTED, /* Spurious wake-up or signal caused futex wait failure */ + TIMEDOUT +}; + +/* Converts return value and errno from a futex syscall to a FutexResult */ +FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno); + /** * Futex is an atomic 32 bit unsigned integer that provides access to the * futex() syscall on that value. It is templated in such a way that it @@ -46,25 +61,69 @@ struct Futex : Atom, boost::noncopyable { * other return (signal, this->load() != expected, or spurious wakeup). */ bool futexWait(uint32_t expected, uint32_t waitMask = -1); + /** Similar to futexWait but also accepts a timeout that gives the time until + * when the call can block (time is the absolute time i.e time since epoch). + * Allowed clock types: std::chrono::system_clock, std::chrono::steady_clock. + * Returns one of FutexResult values. + * + * NOTE: On some systems steady_clock is just an alias for system_clock, + * and is not actually steady.*/ + template + FutexResult futexWaitUntil(uint32_t expected, + const time_point& absTime, + uint32_t waitMask = -1); + /** Wakens up to count waiters where (waitMask & wakeMask) != 0, * returning the number of awoken threads. */ int futexWake(int count = std::numeric_limits::max(), uint32_t wakeMask = -1); + + private: + + /** Futex wait implemented via syscall SYS_futex. absTimeout gives + * time till when the wait can block. If it is nullptr the call will + * block until a matching futex wake is received. extraOpFlags can be + * used to specify addtional flags to add to the futex operation (by + * default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included). + * Returns 0 on success or -1 on error, with errno set to one of the + * values listed in futex(2). */ + int futexWaitImpl(uint32_t expected, + const struct timespec* absTimeout, + int extraOpFlags, + uint32_t waitMask); }; +template <> +inline int +Futex::futexWaitImpl(uint32_t expected, + const struct timespec* absTimeout, + int extraOpFlags, + uint32_t waitMask) { + assert(sizeof(*this) == sizeof(int)); + + /* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout + * value - http://locklessinc.com/articles/futex_cheat_sheet/ */ + int rv = syscall( + SYS_futex, + this, /* addr1 */ + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */ + expected, /* val */ + absTimeout, /* timeout */ + nullptr, /* addr2 */ + waitMask); /* val3 */ + + assert(rv == 0 || + errno == EWOULDBLOCK || + errno == EINTR || + (absTimeout != nullptr && errno == ETIMEDOUT)); + + return rv; +} + template <> inline bool Futex::futexWait(uint32_t expected, uint32_t waitMask) { - assert(sizeof(*this) == sizeof(int)); - int rv = syscall(SYS_futex, - this, /* addr1 */ - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, /* op */ - expected, /* val */ - nullptr, /* timeout */ - nullptr, /* addr2 */ - waitMask); /* val3 */ - assert(rv == 0 || (errno == EWOULDBLOCK || errno == EINTR)); - return rv == 0; + return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0; } template <> @@ -81,4 +140,47 @@ inline int Futex::futexWake(int count, uint32_t wakeMask) { return rv; } +/* Convert std::chrono::time_point to struct timespec */ +template +struct timespec timePointToTimeSpec(const time_point& tp) { + using std::chrono::nanoseconds; + using std::chrono::seconds; + using std::chrono::duration_cast; + + struct timespec ts; + auto duration = tp.time_since_epoch(); + auto secs = duration_cast(duration); + auto nanos = duration_cast(duration - secs); + ts.tv_sec = secs.count(); + ts.tv_nsec = nanos.count(); + return ts; +} + +template class Atom> template +inline FutexResult +Futex::futexWaitUntil( + uint32_t expected, + const time_point& absTime, + uint32_t waitMask) { + + static_assert(std::is_same::value || + std::is_same::value, + "Only std::system_clock or std::steady_clock supported"); + + struct timespec absTimeSpec = timePointToTimeSpec(absTime); + int extraOpFlags = 0; + + /* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point + * from the system clock (CLOCK_REALTIME). This check also works correctly for + * broken glibc in which steady_clock is a typedef to system_clock.*/ + if (std::is_same::value) { + extraOpFlags = FUTEX_CLOCK_REALTIME; + } else { + assert(Clock::is_steady); + } + + const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask); + return futexErrnoToFutexResult(rv, errno); +} + }} diff --git a/folly/test/DeterministicSchedule.cpp b/folly/test/DeterministicSchedule.cpp index 873bcf78..f913b152 100644 --- a/folly/test/DeterministicSchedule.cpp +++ b/folly/test/DeterministicSchedule.cpp @@ -134,6 +134,14 @@ DeterministicSchedule::afterSharedAccess() { sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]); } +int +DeterministicSchedule::getRandNumber(int n) { + if (tls_sched) { + return tls_sched->scheduler_(n); + } + return std::rand() % n; +} + sem_t* DeterministicSchedule::beforeThreadCreate() { sem_t* s = new sem_t; @@ -247,6 +255,49 @@ bool Futex::futexWait(uint32_t expected, return rv; } +FutexResult futexWaitUntilImpl(Futex* futex, + uint32_t expected, uint32_t waitMask) { + if (futex == nullptr) { + return FutexResult::VALUE_CHANGED; + } + + bool rv = false; + int futexErrno = 0; + + DeterministicSchedule::beforeSharedAccess(); + futexLock.lock(); + if (futex->data == expected) { + auto& queue = futexQueues[futex]; + queue.push_back(std::make_pair(waitMask, &rv)); + auto ours = queue.end(); + ours--; + while (!rv) { + futexLock.unlock(); + DeterministicSchedule::afterSharedAccess(); + DeterministicSchedule::beforeSharedAccess(); + futexLock.lock(); + + // Simulate spurious wake-ups, timeouts each time with + // a 10% probability + if (DeterministicSchedule::getRandNumber(100) < 10) { + queue.erase(ours); + if (queue.empty()) { + futexQueues.erase(futex); + } + rv = false; + // Simulate ETIMEDOUT 90% of the time and other failures + // remaining time + futexErrno = + DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR; + break; + } + } + } + futexLock.unlock(); + DeterministicSchedule::afterSharedAccess(); + return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno); +} + template<> int Futex::futexWake(int count, uint32_t wakeMask) { int rv = 0; diff --git a/folly/test/DeterministicSchedule.h b/folly/test/DeterministicSchedule.h index 71cb1037..e4c587b6 100644 --- a/folly/test/DeterministicSchedule.h +++ b/folly/test/DeterministicSchedule.h @@ -124,6 +124,10 @@ class DeterministicSchedule : boost::noncopyable { /** Calls sem_wait(sem) as part of a deterministic schedule. */ static void wait(sem_t* sem); + /** Used scheduler_ to get a random number b/w [0, n). If tls_sched is + * not set-up it falls back to std::rand() */ + static int getRandNumber(int n); + private: static __thread sem_t* tls_sem; static __thread DeterministicSchedule* tls_sched; @@ -274,6 +278,20 @@ template<> bool Futex::futexWait(uint32_t expected, uint32_t waitMask); +/// This function ignores the time bound, and instead pseudo-randomly chooses +/// whether the timeout was reached. To do otherwise would not be deterministic. +FutexResult futexWaitUntilImpl(Futex *futex, + uint32_t expected, uint32_t waitMask); + +template<> template +FutexResult +Futex::futexWaitUntil( + uint32_t expected, + const time_point& absTimeUnused, + uint32_t waitMask) { + return futexWaitUntilImpl(this, expected, waitMask); +} + template<> int Futex::futexWake(int count, uint32_t wakeMask); diff --git a/folly/test/FutexTest.cpp b/folly/test/FutexTest.cpp index 5b2b1b85..37df56bf 100644 --- a/folly/test/FutexTest.cpp +++ b/folly/test/FutexTest.cpp @@ -17,13 +17,17 @@ #include "folly/detail/Futex.h" #include "folly/test/DeterministicSchedule.h" +#include #include #include #include +#include +#include using namespace folly::detail; using namespace folly::test; +using namespace std::chrono; typedef DeterministicSchedule DSched; @@ -45,14 +49,133 @@ void run_basic_tests() { DSched::join(thr); } +template class Atom> +void run_wait_until_tests(); + +template +void stdAtomicWaitUntilTests() { + Futex f(0); + + auto thrA = DSched::thread([&]{ + while (true) { + typename Clock::time_point nowPlus2s = Clock::now() + seconds(2); + auto res = f.futexWaitUntil(0, nowPlus2s); + EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN); + if (res == FutexResult::AWOKEN) { + break; + } + } + }); + + while (f.futexWake() != 1) { + std::this_thread::yield(); + } + + DSched::join(thrA); + + auto start = Clock::now(); + EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)), + FutexResult::TIMEDOUT); + LOG(INFO) << "Futex wait timed out after waiting for " + << duration_cast(Clock::now() - start).count() + << "ms"; +} + +template +void deterministicAtomicWaitUntilTests() { + Futex f(0); + + // Futex wait must eventually fail with either FutexResult::TIMEDOUT or + // FutexResult::INTERRUPTED + auto res = f.futexWaitUntil(0, Clock::now() + milliseconds(100)); + EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED); +} + +template <> +void run_wait_until_tests() { + stdAtomicWaitUntilTests(); + stdAtomicWaitUntilTests(); +} + +template <> +void run_wait_until_tests() { + deterministicAtomicWaitUntilTests(); + deterministicAtomicWaitUntilTests(); +} + +uint64_t diff(uint64_t a, uint64_t b) { + return a > b ? a - b : b - a; +} + +void run_system_clock_test() { + /* Test to verify that system_clock uses clock_gettime(CLOCK_REALTIME, ...) + * for the time_points */ + struct timespec ts; + const int maxIters = 1000; + int iter = 0; + uint64_t delta = 10000000 /* 10 ms */; + + /** The following loop is only to make the test more robust in the presence of + * clock adjustments that can occur. We just run the loop maxIter times and + * expect with very high probability that there will be atleast one iteration + * of the test during which clock adjustments > delta have not occurred. */ + while (iter < maxIters) { + uint64_t a = duration_cast(system_clock::now() + .time_since_epoch()).count(); + + clock_gettime(CLOCK_REALTIME, &ts); + uint64_t b = ts.tv_sec * 1000000000ULL + ts.tv_nsec; + + uint64_t c = duration_cast(system_clock::now() + .time_since_epoch()).count(); + + if (diff(a, b) <= delta && + diff(b, c) <= delta && + diff(a, c) <= 2 * delta) { + /* Success! system_clock uses CLOCK_REALTIME for time_points */ + break; + } + iter++; + } + EXPECT_TRUE(iter < maxIters); +} + +void run_steady_clock_test() { + /* Test to verify that steady_clock uses clock_gettime(CLOCK_MONOTONIC, ...) + * for the time_points */ + EXPECT_TRUE(steady_clock::is_steady); + + uint64_t A = duration_cast(steady_clock::now() + .time_since_epoch()).count(); + + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + uint64_t B = ts.tv_sec * 1000000000ULL + ts.tv_nsec; + + uint64_t C = duration_cast(steady_clock::now() + .time_since_epoch()).count(); + EXPECT_TRUE(A <= B && B <= C); +} + +TEST(Futex, clock_source) { + run_system_clock_test(); + + /* On some systems steady_clock is just an alias for system_clock. So, + * we must skip run_steady_clock_test if the two clocks are the same. */ + if (!std::is_same::value) { + run_steady_clock_test(); + } +} TEST(Futex, basic_live) { run_basic_tests(); + run_wait_until_tests(); } TEST(Futex, basic_deterministic) { DSched sched(DSched::uniform(0)); run_basic_tests(); + run_wait_until_tests(); } int main(int argc, char ** argv) { -- 2.34.1