fix flaky ConnectTFOTimeout and ConnectTFOFallbackTimeout tests
[folly.git] / folly / MPMCQueue.h
index 05de4c7b10c7b39ed23f1268c35e74c0d24d259c..08b2e1b8f7e31cd86b9881c30311fcb1fe08ed7f 100644 (file)
 #include <limits>
 #include <string.h>
 #include <type_traits>
-#include <unistd.h>
 
 #include <folly/Traits.h>
 #include <folly/detail/CacheLocality.h>
 #include <folly/detail/TurnSequencer.h>
+#include <folly/portability/Unistd.h>
 
 namespace folly {
 
@@ -284,6 +284,21 @@ class MPMCQueue : boost::noncopyable {
     }
   }
 
+  template <class Clock, typename... Args>
+  bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
+                     Args&&... args) noexcept {
+    uint64_t ticket;
+    if (tryObtainPromisedPushTicketUntil(ticket, when)) {
+      // we have pre-validated that the ticket won't block, or rather that
+      // it won't block longer than it takes another thread to dequeue an
+      // element from the slot it identifies.
+      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   /// If the queue is not full, enqueues and returns true, otherwise
   /// returns false.  Unlike write this method can be blocked by another
   /// thread, specifically a read that has linearized (been assigned
@@ -471,6 +486,28 @@ class MPMCQueue : boost::noncopyable {
     }
   }
 
+  /// Tries until when to obtain a push ticket for which
+  /// SingleElementQueue::enqueue  won't block.  Returns true on success, false
+  /// on failure.
+  /// ticket is filled on success AND failure.
+  template <class Clock>
+  bool tryObtainPromisedPushTicketUntil(
+      uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+    bool deadlineReached = false;
+    while (!deadlineReached) {
+      if (tryObtainPromisedPushTicket(ticket)) {
+        return true;
+      }
+      // ticket is a blocking ticket until the preceding ticket has been
+      // processed: wait until this ticket's turn arrives. We have not reserved
+      // this ticket so we will have to re-attempt to get a non-blocking ticket
+      // if we wake up before we time-out.
+      deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
+          turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+    }
+    return false;
+  }
+
   /// Tries to obtain a push ticket which can be satisfied if all
   /// in-progress pops complete.  This function does not block, but
   /// blocking may be required when using the returned ticket if some
@@ -482,6 +519,7 @@ class MPMCQueue : boost::noncopyable {
       auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
       int64_t n = numPushes - numPops;
+      rv = numPushes;
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the
@@ -489,7 +527,6 @@ class MPMCQueue : boost::noncopyable {
         return false;
       }
       if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
-        rv = numPushes;
         return true;
       }
     }
@@ -597,7 +634,7 @@ struct SingleElementQueue {
   template <typename = typename std::enable_if<
                 (folly::IsRelocatable<T>::value &&
                  boost::has_nothrow_constructor<T>::value) ||
-                std::is_nothrow_constructible<T,T&&>::value>::type>
+                std::is_nothrow_constructible<T, T&&>::value>::type>
   void enqueue(const uint32_t turn,
                Atom<uint32_t>& spinCutoff,
                const bool updateSpinCutoff,
@@ -611,6 +648,20 @@ struct SingleElementQueue {
                                   ImplByMove, ImplByRelocation>::type());
   }
 
+  /// Waits until either:
+  /// 1: the dequeue turn preceding the given enqueue turn has arrived
+  /// 2: the given deadline has arrived
+  /// Case 1 returns true, case 2 returns false.
+  template <class Clock>
+  bool tryWaitForEnqueueTurnUntil(
+      const uint32_t turn,
+      Atom<uint32_t>& spinCutoff,
+      const bool updateSpinCutoff,
+      const std::chrono::time_point<Clock>& when) noexcept {
+    return sequencer_.tryWaitForTurn(
+        turn * 2, spinCutoff, updateSpinCutoff, &when);
+  }
+
   bool mayEnqueue(const uint32_t turn) const noexcept {
     return sequencer_.isTurn(turn * 2);
   }