Flesh out Optional members swap, reset, emplace, has_value
[folly.git] / folly / Baton.h
index 86b2e013d0caf6785ec71bb2473658829dc0889e..833239153d8567fb356258c42f484db3ee8b0343 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#ifndef FOLLY_BATON_H
-#define FOLLY_BATON_H
+#pragma once
 
+#include <assert.h>
+#include <errno.h>
 #include <stdint.h>
 #include <atomic>
-#include <boost/noncopyable.hpp>
-#include <errno.h>
-#include <assert.h>
+#include <thread>
 
 #include <folly/detail/Futex.h>
 #include <folly/detail/MemoryIdler.h>
+#include <folly/portability/Asm.h>
 
 namespace folly {
 
-/// A Baton allows a thread to block once and be awoken: it captures
-/// a single handoff.  During its lifecycle (from construction/reset to
-/// destruction/reset) a baton must either be post()ed and wait()ed exactly
-/// once each, or not at all.
+/// A Baton allows a thread to block once and be awoken. The single
+/// poster version (with SinglePoster == true) captures a single
+/// handoff, and during its lifecycle (from construction/reset to
+/// destruction/reset) a baton must either be post()ed and wait()ed
+/// exactly once each, or not at all.
+///
+/// The multi-poster version (SinglePoster == false) allows multiple
+/// concurrent handoff attempts, the first of which completes the
+/// handoff and the rest if any are idempotent.
 ///
 /// Baton includes no internal padding, and is only 4 bytes in size.
 /// Any alignment or padding to avoid false sharing is up to the user.
 ///
-/// This is basically a stripped-down semaphore that supports only a
-/// single call to sem_post and a single call to sem_wait.  The current
-/// posix semaphore sem_t isn't too bad, but this provides more a bit more
-/// speed, inlining, smaller size, a guarantee that the implementation
-/// won't change, and compatibility with DeterministicSchedule.  By having
-/// a much more restrictive lifecycle we can also add a bunch of assertions
-/// that can help to catch race conditions ahead of time.
-template <template<typename> class Atom = std::atomic>
-struct Baton : boost::noncopyable {
-  Baton() : state_(INIT) {}
+/// This is basically a stripped-down semaphore that supports (only a
+/// single call to sem_post, when SinglePoster == true) and a single
+/// call to sem_wait.
+///
+/// The non-blocking version (Blocking == false) provides more speed
+/// by using only load acquire and store release operations in the
+/// critical path, at the cost of disallowing blocking and timing out.
+///
+/// The current posix semaphore sem_t isn't too bad, but this provides
+/// more a bit more speed, inlining, smaller size, a guarantee that
+/// the implementation won't change, and compatibility with
+/// DeterministicSchedule.  By having a much more restrictive
+/// lifecycle we can also add a bunch of assertions that can help to
+/// catch race conditions ahead of time.
+template <
+    template <typename> class Atom = std::atomic,
+    bool SinglePoster = true, //  single vs multiple posters
+    bool Blocking = true> // blocking vs spinning
+struct Baton {
+  constexpr Baton() : state_(INIT) {}
+
+  Baton(Baton const&) = delete;
+  Baton& operator=(Baton const&) = delete;
 
   /// It is an error to destroy a Baton on which a thread is currently
   /// wait()ing.  In practice this means that the waiter usually takes
   /// responsibility for destroying the Baton.
   ~Baton() {
-    // The docblock for this function says that is can't be called when
+    // The docblock for this function says that it can't be called when
     // there is a concurrent waiter.  We assume a strong version of this
     // requirement in which the caller must _know_ that this is true, they
     // are not allowed to be merely lucky.  If two threads are involved,
@@ -94,23 +112,81 @@ struct Baton : boost::noncopyable {
   }
 
   /// Causes wait() to wake up.  For each lifetime of a Baton (where a
-  /// lifetime starts at construction or reset() and ends at destruction
-  /// or reset()) there can be at most one call to post().  Any thread
-  /// may call post().
-  ///
-  /// Although we could implement a more generic semaphore semantics
-  /// without any extra size or CPU overhead, the single-call limitation
-  /// allows us to have better assert-ions during debug builds.
+  /// lifetime starts at construction or reset() and ends at
+  /// destruction or reset()) there can be at most one call to post(),
+  /// in the single poster version.  Any thread may call post().
   void post() {
-    uint32_t before = state_.load(std::memory_order_acquire);
-    assert(before == INIT || before == WAITING);
-    if (before != INIT ||
-        !state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
-      // we didn't get to state_ before wait(), so we need to call futex()
-      assert(before == WAITING);
+    if (!Blocking) {
+      /// Non-blocking version
+      ///
+      assert([&] {
+        auto state = state_.load(std::memory_order_relaxed);
+        return (state == INIT || state == EARLY_DELIVERY);
+      }());
+      state_.store(EARLY_DELIVERY, std::memory_order_release);
+      return;
+    }
+
+    /// Blocking versions
+    ///
+    if (SinglePoster) {
+      /// Single poster version
+      ///
+      uint32_t before = state_.load(std::memory_order_acquire);
+
+      assert(before == INIT || before == WAITING || before == TIMED_OUT);
+
+      if (before == INIT &&
+          state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+        return;
+      }
+
+      assert(before == WAITING || before == TIMED_OUT);
+
+      if (before == TIMED_OUT) {
+        return;
+      }
 
+      assert(before == WAITING);
       state_.store(LATE_DELIVERY, std::memory_order_release);
       state_.futexWake(1);
+    } else {
+      /// Multi-poster version
+      ///
+      while (true) {
+        uint32_t before = state_.load(std::memory_order_acquire);
+
+        if (before == INIT &&
+            state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+          return;
+        }
+
+        if (before == TIMED_OUT) {
+          return;
+        }
+
+        if (before == EARLY_DELIVERY || before == LATE_DELIVERY) {
+          // The reason for not simply returning (without the following
+          // atomic operation) is to avoid the following case:
+          //
+          //  T1:             T2:             T3:
+          //  local1.post();  local2.post();  global.wait();
+          //  global.post();  global.post();  local1.try_wait() == true;
+          //                                  local2.try_wait() == false;
+          //
+          if (state_.fetch_add(0) != before) {
+            continue;
+          }
+          return;
+        }
+
+        assert(before == WAITING);
+        if (!state_.compare_exchange_weak(before, LATE_DELIVERY)) {
+          continue;
+        }
+        state_.futexWake(1);
+        return;
+      }
     }
   }
 
@@ -124,30 +200,23 @@ struct Baton : boost::noncopyable {
   /// but by making this condition very restrictive we can provide better
   /// checking in debug builds.
   void wait() {
-    uint32_t before;
+    if (spinWaitForEarlyDelivery()) {
+      assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+      return;
+    }
 
-    static_assert(PreBlockAttempts > 0,
-        "isn't this assert clearer than an uninitialized variable warning?");
-    for (int i = 0; i < PreBlockAttempts; ++i) {
-      before = state_.load(std::memory_order_acquire);
-      if (before == EARLY_DELIVERY) {
-        // hooray!
-        return;
+    if (!Blocking) {
+      while (!try_wait()) {
+        std::this_thread::yield();
       }
-      assert(before == INIT);
-#if FOLLY_X64
-      // The pause instruction is the polite way to spin, but it doesn't
-      // actually affect correctness to omit it if we don't have it.
-      // Pausing donates the full capabilities of the current core to
-      // its other hyperthreads for a dozen cycles or so
-      asm volatile ("pause");
-#endif
+      return;
     }
 
     // guess we have to block :(
-    if (!state_.compare_exchange_strong(before, WAITING)) {
+    uint32_t expected = INIT;
+    if (!state_.compare_exchange_strong(expected, WAITING)) {
       // CAS failed, last minute reprieve
-      assert(before == EARLY_DELIVERY);
+      assert(expected == EARLY_DELIVERY);
       return;
     }
 
@@ -181,12 +250,79 @@ struct Baton : boost::noncopyable {
     }
   }
 
+  /// Similar to wait, but with a timeout. The thread is unblocked if the
+  /// timeout expires.
+  /// Note: Only a single call to timed_wait/wait is allowed during a baton's
+  /// life-cycle (from construction/reset to destruction/reset). In other
+  /// words, after timed_wait the caller can't invoke wait/timed_wait/try_wait
+  /// again on the same baton without resetting it.
+  ///
+  /// @param  deadline      Time until which the thread can block
+  /// @return               true if the baton was posted to before timeout,
+  ///                       false otherwise
+  template <typename Clock, typename Duration = typename Clock::duration>
+  bool timed_wait(const std::chrono::time_point<Clock,Duration>& deadline) {
+    static_assert(Blocking, "Non-blocking Baton does not support timed wait.");
+
+    if (spinWaitForEarlyDelivery()) {
+      assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+      return true;
+    }
+
+    // guess we have to block :(
+    uint32_t expected = INIT;
+    if (!state_.compare_exchange_strong(expected, WAITING)) {
+      // CAS failed, last minute reprieve
+      assert(expected == EARLY_DELIVERY);
+      return true;
+    }
+
+    while (true) {
+      auto rv = state_.futexWaitUntil(WAITING, deadline);
+      if (rv == folly::detail::FutexResult::TIMEDOUT) {
+        state_.store(TIMED_OUT, std::memory_order_release);
+        return false;
+      }
+
+      uint32_t s = state_.load(std::memory_order_acquire);
+      assert(s == WAITING || s == LATE_DELIVERY);
+      if (s == LATE_DELIVERY) {
+        return true;
+      }
+    }
+  }
+
+  /// Similar to timed_wait, but with a duration.
+  template <typename Clock = std::chrono::steady_clock, typename Duration>
+  bool timed_wait(const Duration& duration) {
+    auto deadline = Clock::now() + duration;
+    return timed_wait(deadline);
+  }
+
+  /// Similar to wait, but doesn't block the thread if it hasn't been posted.
+  ///
+  /// try_wait has the following semantics:
+  /// - It is ok to call try_wait any number times on the same baton until
+  ///   try_wait reports that the baton has been posted.
+  /// - It is ok to call timed_wait or wait on the same baton if try_wait
+  ///   reports that baton hasn't been posted.
+  /// - If try_wait indicates that the baton has been posted, it is invalid to
+  ///   call wait, try_wait or timed_wait on the same baton without resetting
+  ///
+  /// @return       true if baton has been posted, false othewise
+  bool try_wait() const {
+    auto s = state_.load(std::memory_order_acquire);
+    assert(s == INIT || s == EARLY_DELIVERY);
+    return s == EARLY_DELIVERY;
+  }
+
  private:
   enum State : uint32_t {
     INIT = 0,
     EARLY_DELIVERY = 1,
     WAITING = 2,
     LATE_DELIVERY = 3,
+    TIMED_OUT = 4
   };
 
   enum {
@@ -206,9 +342,32 @@ struct Baton : boost::noncopyable {
     PreBlockAttempts = 300,
   };
 
+  // Spin for "some time" (see discussion on PreBlockAttempts) waiting
+  // for a post.
+  //
+  // @return       true if we received an early delivery during the wait,
+  //               false otherwise. If the function returns true then
+  //               state_ is guaranteed to be EARLY_DELIVERY
+  bool spinWaitForEarlyDelivery() {
+
+    static_assert(PreBlockAttempts > 0,
+        "isn't this assert clearer than an uninitialized variable warning?");
+    for (int i = 0; i < PreBlockAttempts; ++i) {
+      if (try_wait()) {
+        // hooray!
+        return true;
+      }
+      // The pause instruction is the polite way to spin, but it doesn't
+      // actually affect correctness to omit it if we don't have it.
+      // Pausing donates the full capabilities of the current core to
+      // its other hyperthreads for a dozen cycles or so
+      asm_volatile_pause();
+    }
+
+    return false;
+  }
+
   detail::Futex<Atom> state_;
 };
 
 } // namespace folly
-
-#endif