replace std::dequeue with UMPMCQueue in UnboundedBlockingQueue
authorYedidya Feldblum <yfeldblum@fb.com>
Sat, 9 Dec 2017 05:11:03 +0000 (21:11 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Sat, 9 Dec 2017 05:19:44 +0000 (21:19 -0800)
Summary: As above. Thanks magedm for giving us this beautiful piece of equipment!

Reviewed By: magedm

Differential Revision: D6488661

fbshipit-source-id: 95aa9646ca1ea937bb1d055e9baa037896c3161e

folly/executors/task_queue/UnboundedBlockingQueue.h

index 8fc8391..95fa94c 100644 (file)
 #pragma once
 
 #include <folly/Synchronized.h>
+#include <folly/concurrency/UnboundedQueue.h>
 #include <folly/executors/task_queue/BlockingQueue.h>
 #include <folly/synchronization/LifoSem.h>
-#include <queue>
 
 namespace folly {
 
-// Warning: this is effectively just a std::deque wrapped in a single mutex
-// We are aiming to add a more performant concurrent unbounded queue in the
-// future, but this class is available if you must have an unbounded queue
-// and can tolerate any contention.
 template <class T>
 class UnboundedBlockingQueue : public BlockingQueue<T> {
  public:
   virtual ~UnboundedBlockingQueue() {}
 
   void add(T item) override {
-    queue_.wlock()->push(std::move(item));
+    queue_.enqueue(std::move(item));
     sem_.post();
   }
 
   T take() override {
-    while (true) {
-      {
-        auto ulockedQueue = queue_.ulock();
-        if (!ulockedQueue->empty()) {
-          auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
-          T item = std::move(wlockedQueue->front());
-          wlockedQueue->pop();
-          return item;
-        }
-      }
+    T item;
+    while (!queue_.try_dequeue(item)) {
       sem_.wait();
     }
+    return item;
   }
 
   size_t size() override {
-    return queue_.rlock()->size();
+    return queue_.size();
   }
 
  private:
   LifoSem sem_;
-  Synchronized<std::queue<T>> queue_;
+  UMPMCQueue<T, false> queue_;
 };
 
 } // namespace folly