UnboundedQueue: Change try_dequeue variants to fail only if the queue is empty
authorMaged Michael <magedmichael@fb.com>
Sat, 16 Dec 2017 22:13:47 +0000 (14:13 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Sat, 16 Dec 2017 22:37:22 +0000 (14:37 -0800)
Summary:
The current semantics of try_dequeue variants can cause problems in cases where based on external dependencies the consumer is guaranteed that the queue is not empty and depends on the success of try_dequeue. See https://github.com/facebook/wangle/commit/abd07a06cae09efcd4347a9a019d59ae8a013c3f

This diff changes the semantics so that try_dequeue operations fail only if the queue is empty.

Reviewed By: yfeldblum

Differential Revision: D6586156

fbshipit-source-id: 25d6085e28d3e24034ecf6a8bafab51c95464b01

folly/concurrency/UnboundedQueue.h

index 77bc5b9fa7d005dfb5f022608bffd1685fbfa115..221e73ce728fd4e153ae18f1ecb0c3b52da255dd 100644 (file)
@@ -377,11 +377,11 @@ class UnboundedQueue {
       // possible to call ~T() and it may happen to use hazard pointers.
       folly::hazptr::hazptr_holder hptr;
       Segment* s = hptr.get_protected(c_.head);
-      return ryDequeueUntilMC(s, item, deadline);
+      return tryDequeueUntilMC(s, item, deadline);
     }
   }
 
-  /** ryDequeueUntilSC */
+  /** tryDequeueUntilSC */
   template <typename Clock, typename Duration>
   FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
@@ -392,7 +392,7 @@ class UnboundedQueue {
     DCHECK_LT(t, (s->minTicket() + SegmentSize));
     size_t idx = index(t);
     Entry& e = s->entry(idx);
-    if (!e.tryWaitUntil(deadline)) {
+    if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
       return false;
     }
     setConsumerTicket(t + 1);
@@ -405,7 +405,7 @@ class UnboundedQueue {
 
   /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
       Segment* s,
       T& item,
       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
@@ -420,7 +420,7 @@ class UnboundedQueue {
       }
       size_t idx = index(t);
       Entry& e = s->entry(idx);
-      if (!e.tryWaitUntil(deadline)) {
+      if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
         return false;
       }
       if (!c_.ticket.compare_exchange_weak(
@@ -435,6 +435,23 @@ class UnboundedQueue {
     }
   }
 
+  /** tryDequeueWaitElem */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
+      Entry& e,
+      Ticket t,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      if (LIKELY(e.tryWaitUntil(deadline))) {
+        return true;
+      }
+      if (t >= producerTicket()) {
+        return false;
+      }
+      asm_volatile_pause();
+    }
+  }
+
   /** findSegment */
   FOLLY_ALWAYS_INLINE
   Segment* findSegment(Segment* s, const Ticket t) const noexcept {