Adds folly queue parallel test cases
[folly.git] / folly / stress-test / stress-parallel-folly-queue.cpp
index 115c523193e609a67808cb34ba195d07ab6c027c..ba6ee797c6cae0aafe0fb2aacf2f62ed0ecd0274 100644 (file)
-#include <folly/concurrency/UnboundedQueue.h>
-#include <folly/concurrency/DynamicBoundedQueue.h>
-#include <folly/AtomicLinkedList.h>
-#include <folly/MPMCQueue.h>
-
-#include <gtest/gtest.h>
-
-#include <memory>
-
-namespace {
-
-// Unbounded queue
-size_t kUnboundedQueueEnqueueStride = 10000;
-size_t kUSPSCQueueEnqueueCount = 1200000000;
-size_t kUMPSCQueueEnqueueCount = 320000000;
-size_t kUSPMCQueueEnqueueCount = 320000000;
-size_t kUMPMCQueueEnqueueCount = 320000000;
-
-typedef folly::USPSCQueue<size_t, false> USPSCQueue;
-typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
-typedef folly::USPMCQueue<size_t, false> USPMCQueue;
-typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
-
-// Dynamic bounded queue
-size_t kDynamicBoundedQueueEnqueueStride = 50000;
-size_t kDynamicBoundedQueueCapacity = 200000;
-size_t kDSPSCQueueEnqueueCount = 1200000000;
-size_t kDMPSCQueueEnqueueCount = 320000000;
-size_t kDSPMCQueueEnqueueCount = 320000000;
-size_t kDMPMCQueueEnqueueCount = 320000000;
-
-typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
-typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
-typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
-typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
+#include "queue_test.h"
+
+namespace folly_test {
+
+class FollyQueueEnqueueDequeueTest_Parallel : public cds_test::stress_fixture {
+protected:
+  static std::atomic_int producer_num;
+
+  // The milliseconds for consumers to wait when a failed try_dequeue happens.
+  static unsigned s_nConsumerWaitTime;
+  // For MPMC, half of the threads are producers, and the rest are consumers.
+  static unsigned s_nThreadCount;
+  // Unbounded queue
+  static size_t s_nUnboundedQueueEnqueueStride;
+  static size_t s_nUSPSCQueueEnqueueCount;
+  static size_t s_nUMPSCQueueEnqueueCount;
+  static size_t s_nUSPMCQueueEnqueueCount;
+  static size_t s_nUMPMCQueueEnqueueCount;
+
+  // Dynamic bounded queue
+  static size_t s_nDynamicBoundedQueueEnqueueStride;
+  static size_t s_nDynamicBoundedQueueCapacity;
+  static size_t s_nDSPSCQueueEnqueueCount;
+  static size_t s_nDMPSCQueueEnqueueCount;
+  static size_t s_nDSPMCQueueEnqueueCount;
+  static size_t s_nDMPMCQueueEnqueueCount;
+
+  // AtomicLinkedList
+  static size_t s_nAtomicLinkedListPassCount;
+
+  // MPMC Queue (linearizable)
+  static size_t s_nMPMCQueueEnqueueStride;
+  static size_t s_nMPMCQueueCapacity;
+  static size_t s_nMPMCQueueEnqueueCount;
+
+  static void SetUpTestCase() {
+    const cds_test::config &cfg = get_config("ParallelFollyQueue");
+    // Unbounded queue
+    GetConfigNonZeroExpected(ConsumerWaitTime, 200);
+    GetConfigNonZeroExpected(ThreadCount, 4);
+    GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
+    GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
+    GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
+    // Dynamic bounded queue
+    GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
+    GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
+    GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
+    GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
+    // AtomicLinkedList
+    GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
+    // MPMC Queue (linearizable)
+    GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
+    GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
+    GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
+  }
 
-// AtomicLinkedList
-size_t kAtomicLinkedListSize = 50000;
-size_t kAtomicLinkedListPassCount = 10000;
-typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
+  template <typename Queue, typename Type>
+  static void general_enqueue(Queue *q, const Type &elem) {
+    q->enqueue(elem);
+  }
 
-// MPMC Queue (linearizable)
-size_t kMPMCQueueEnqueueStride = 10000;
-size_t kMPMCQueueCapacity = 50000;
-size_t kMPMCQueueEnqueueCount = 500000000;
-typedef folly::MPMCQueue<size_t> MPMCQueue;
+  template <typename Queue, typename Type>
+  static bool general_try_dequeue(Queue *q, Type &result) {
+    return q->try_dequeue(result);
+  }
 
-}
+  // MPMC Specialization.
+  template <typename Type>
+  static void general_enqueue(MPMCQueue *q, const Type &elem) {
+    EXPECT_TRUE(q->write(elem));
+  }
 
-void run_atomic_linkedlist() {
-  for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
-    std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
-    bool in_order = true;
-    for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
-      list->insertHead(i);
-    }
-    size_t nSum = 0;
-    auto func = [&nSum] (size_t elem) { nSum += elem; };
-    if (in_order) {
-      list->sweep(func);
-    } else {
-      list->reverseSweep(func);
-    }
-    in_order = !in_order;
+  template <typename Type>
+  static bool general_try_dequeue(MPMCQueue *q, Type &result) {
+    return q->read(result);
+  }
 
-    size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
-    EXPECT_EQ(nSum, supposed_sum);
+  // AtomicLinkedList Specialization.
+  template <typename Type>
+  static void general_enqueue(AtomicLinkedList *q, const Type &elem) {
+    q->insertHead(elem);
   }
-}
 
-template <typename Queue>
-void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) {
-    size_t nNo = 0;
-    size_t pop_sum = 0;
-    while (nNo < enqueue_count) {
-      size_t curr_push_count =
-          std::min(enqueue_count - nNo, enqueue_stride);
-      for (size_t i = 0; i < curr_push_count; i++) {
-        q->enqueue(nNo++);
-      }
-      size_t res;
-      for (size_t i = 0; i < curr_push_count; i++) {
-        q->dequeue(res);
-        pop_sum += res;
+  template <typename Queue>
+  static void run_producer(Queue *q, size_t enqueue_count) {
+    for (size_t i = 0; i < enqueue_count; i++) {
+      size_t elem_to_push = rand(enqueue_count);
+      if (!elem_to_push) {
+        elem_to_push++;
       }
+      general_enqueue(q, elem_to_push);
     }
-    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
-    EXPECT_EQ (pop_sum, supposed_sum);
-}
+    producer_num.fetch_sub(1, std::memory_order_release);
+  }
 
-// MPMC Specialization.
-template <>
-void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) {
-    size_t nNo = 0;
-    size_t push_sum = 0;
-    size_t pop_sum = 0;
-    while (nNo < enqueue_count) {
-      size_t curr_push_count =
-          std::min(enqueue_count - nNo, enqueue_stride);
-      for (size_t i = 0; i < curr_push_count; i++) {
-        if (q->write(nNo)) {
-          push_sum += nNo;
-          nNo++;
+  template <typename Queue> static void run_consumer(Queue *q) {
+    size_t dequeue_sum = 0;
+    size_t result = 0;
+    while (true) {
+      if (!general_try_dequeue(q, result)) {
+        if (producer_num.load(std::memory_order_acquire) == 0) {
+          if (!general_try_dequeue(q, result)) {
+            // If all producers are done and we still dequeue to nothing,
+            // the consumer quits.
+            break;
+          }
         }
       }
-      size_t res;
-      while (q->read(res)) {
-        pop_sum += res;
+      dequeue_sum += result;
+    }
+    EXPECT_GT(dequeue_sum, 0);
+  }
+
+  template <typename QueueType>
+  static void FollyMPMCThreading(QueueType *q, size_t producer_cnt,
+                                 size_t producer_pass_count,
+                                 size_t consumer_cnt) {
+    producer_num.store(producer_cnt, std::memory_order_relaxed);
+    size_t total_thread_cnt = producer_cnt + consumer_cnt;
+    std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+    for (size_t i = 0; i < total_thread_cnt; i++) {
+      if (i < producer_cnt) {
+        threads[i] =
+            std::thread(run_producer<QueueType>, q, producer_pass_count);
+      } else {
+        threads[i] = std::thread(run_consumer<QueueType>, q);
       }
     }
+    for (size_t i = 0; i < total_thread_cnt; i++) {
+      threads[i].join();
+    }
+  }
+};
 
-    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
-    EXPECT_EQ(pop_sum, supposed_sum);
+// Specialization for AtomicLinkedList
+template <>
+void FollyQueueEnqueueDequeueTest_Parallel::run_consumer(AtomicLinkedList *q) {
+  size_t dequeue_sum = 0;
+  auto func = [&dequeue_sum](size_t elem) { dequeue_sum += elem; };
+  while (true) {
+    q->sweep(func);
+    if (producer_num.load(std::memory_order_acquire) == 0) {
+      q->sweep(func);
+      // If all producers are done and we still dequeue to nothing,
+      // the consumer quits.
+      break;
+    }
+  }
+  EXPECT_GT(dequeue_sum, 0);
 }
 
-template <typename Queue>
-void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) {
+std::atomic_int FollyQueueEnqueueDequeueTest_Parallel::producer_num;
+
+unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nConsumerWaitTime;
+unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nThreadCount;
+// Unbounded queue
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUnboundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPMCQueueEnqueueCount;
+// Dynamic bounded queue
+size_t
+    FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPMCQueueEnqueueCount;
+// AtomicLinkedList
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nAtomicLinkedListPassCount;
+// MPMC Queue (linearizable)
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueCount;
+
+// Used as a MPSC queue.
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyAtomicLinkedList) {
+  typedef AtomicLinkedList Queue;
   std::unique_ptr<Queue> q(new Queue());
-  run_queue(q.get(), enqueue_count, enqueue_stride);
+  FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nAtomicLinkedListPassCount,
+                     1);
 }
 
-template <typename Queue>
-void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
-                               size_t enqueue_stride) {
-  std::unique_ptr<Queue> q(new Queue(queue_capacity));
-  run_queue(q.get(), enqueue_count, enqueue_stride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyMPMCQueue) {
+  typedef MPMCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue(s_nMPMCQueueCapacity));
+  FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nMPMCQueueEnqueueCount,
+                     s_nThreadCount - s_nThreadCount / 2);
 }
 
-class FollyQueueEnqueueDequeueTest : public ::testing::Test {
-
-};
-
-TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) {
-  run_with_initial_capacity<MPMCQueue>(
-      kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPSC) {
+  typedef DSPSCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+  FollyMPMCThreading(q.get(), 1, s_nDSPSCQueueEnqueueCount, 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) {
-  run_atomic_linkedlist();
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPSC) {
+  typedef DMPSCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+  FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nDMPSCQueueEnqueueCount, 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) {
-  run_without_initial_capacity<USPSCQueue>(
-      kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPMC) {
+  typedef DSPMCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+  FollyMPMCThreading(q.get(), 1, s_nDSPMCQueueEnqueueCount, s_nThreadCount - 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) {
-  run_without_initial_capacity<UMPSCQueue>(
-      kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPMC) {
+  typedef DMPMCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+  FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nDMPMCQueueEnqueueCount,
+                     s_nThreadCount - s_nThreadCount / 2);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) {
-  run_without_initial_capacity<USPMCQueue>(
-      kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPSC) {
+  typedef USPSCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue());
+  FollyMPMCThreading(q.get(), 1, s_nUSPSCQueueEnqueueCount, 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) {
-  run_without_initial_capacity<UMPMCQueue>(
-      kUMPMCQueueEnqueueCount,
-      kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPSC) {
+  typedef UMPSCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue());
+  FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nUMPSCQueueEnqueueCount, 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) {
-  // DynamicBoundedQueue
-  run_with_initial_capacity<DSPSCQueue>(
-      kDynamicBoundedQueueCapacity,
-      kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPMC) {
+  typedef USPMCQueue Queue;
+  std::unique_ptr<Queue> q(new Queue());
+  FollyMPMCThreading(q.get(), 1, s_nUSPMCQueueEnqueueCount, s_nThreadCount - 1);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) {
-  run_with_initial_capacity<DMPSCQueue>(
-      kDynamicBoundedQueueCapacity,
-      kDMPSCQueueEnqueueCount,
-      kDynamicBoundedQueueEnqueueStride);
-}
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPMC) {
+  typedef UMPMCQueue Queue;
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) {
-  run_with_initial_capacity<DSPMCQueue>(
-      kDynamicBoundedQueueCapacity,
-      kDSPMCQueueEnqueueCount,
-      kDynamicBoundedQueueEnqueueStride);
+  std::unique_ptr<Queue> q(new Queue());
+  FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nUMPMCQueueEnqueueCount,
+                     s_nThreadCount - s_nThreadCount / 2);
 }
 
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) {
-  run_with_initial_capacity<DMPMCQueue>(
-      kDynamicBoundedQueueCapacity,
-      kDMPMCQueueEnqueueCount,
-      kDynamicBoundedQueueEnqueueStride);
-}
+} // namespace folly_test