Uses cdsstree framework in folly sequential queue test cases
[folly.git] / folly / stress-test / stress-sequential-folly-queue.cpp
index eade97fa420700246f327fdb7fd308bc81020d97..926652d06bf5da9c0706d4a92ff42d62e7fc2df4 100644 (file)
@@ -1,46 +1,86 @@
-#include <folly/concurrency/UnboundedQueue.h>
+#include "queue_test.h"
 
-#include <chrono>
-#include <cassert>
-#include <iostream>
-#include <memory>
+namespace folly_test {
 
-namespace {
+class FollyQueueEnqueueDequeueTest_Sequential
+    : public cds_test::stress_fixture {
+protected:
+  // Unbounded queue
+  static size_t s_nUnboundedQueueEnqueueStride;
+  static size_t s_nUSPSCQueueEnqueueCount;
+  static size_t s_nUMPSCQueueEnqueueCount;
+  static size_t s_nUSPMCQueueEnqueueCount;
+  static size_t s_nUMPMCQueueEnqueueCount;
 
-const char* kTestName = "EnqueueDequeue";
-size_t kEnqueueStride = 10000;
+  // Dynamic bounded queue
+  static size_t s_nDynamicBoundedQueueEnqueueStride;
+  static size_t s_nDynamicBoundedQueueCapacity;
+  static size_t s_nDSPSCQueueEnqueueCount;
+  static size_t s_nDMPSCQueueEnqueueCount;
+  static size_t s_nDSPMCQueueEnqueueCount;
+  static size_t s_nDMPMCQueueEnqueueCount;
 
-size_t kUSPSCQueueEnqueueCount = 1200000000;
-const char* kUSPSCQueueBenchmarkName = "FollyUSPSCQueue";
+  // AtomicLinkedList
+  static size_t s_nAtomicLinkedListSize;
+  static size_t s_nAtomicLinkedListPassCount;
 
-size_t kUMPSCQueueEnqueueCount = 320000000;
-const char* kUMPSCQueueBenchmarkName = "FollyUMPSCQueue";
+  // MPMC Queue (linearizable)
+  static size_t s_nMPMCQueueEnqueueStride;
+  static size_t s_nMPMCQueueCapacity;
+  static size_t s_nMPMCQueueEnqueueCount;
 
-size_t kUSPMCQueueEnqueueCount = 320000000;
-const char* kUSPMCQueueBenchmarkName = "FollyUSPMCQueue";
+  static void SetUpTestCase() {
+    const cds_test::config &cfg = get_config("SequentialFollyQueue");
+    // Unbounded queue
+    GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
+    GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
+    GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
+    // Dynamic bounded queue
+    GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
+    GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
+    GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
+    GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
+    GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
+    // AtomicLinkedList
+    GetConfigNonZeroExpected(AtomicLinkedListSize, 50000);
+    GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
+    // MPMC Queue (linearizable)
+    GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
+    GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
+    GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
+  }
 
-size_t kUMPMCQueueEnqueueCount = 320000000;
-const char* kUMPMCQueueBenchmarkName = "FollyMPMCQueue";
-
-typedef folly::USPSCQueue<size_t, false> USPSCQueue;
-typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
-typedef folly::USPMCQueue<size_t, false> USPMCQueue;
-typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
-
-}
+  static void run_atomic_linkedlist() {
+    for (size_t pass = 0; pass < s_nAtomicLinkedListPassCount; pass++) {
+      std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
+      bool in_order = true;
+      for (size_t i = 0; i < s_nAtomicLinkedListSize; i++) {
+        list->insertHead(i);
+      }
+      size_t nSum = 0;
+      auto func = [&nSum](size_t elem) { nSum += elem; };
+      if (in_order) {
+        list->sweep(func);
+      } else {
+        list->reverseSweep(func);
+      }
+      in_order = !in_order;
 
-template <typename Queue>
-void run_queue(size_t enqueue_count, const char* bench_name) {
-    std::cout << "[ RUN      ] " << kTestName << "." << bench_name << std::endl;
-    auto start_time = std::chrono::system_clock::now();
+      size_t supposed_sum =
+          s_nAtomicLinkedListSize * (s_nAtomicLinkedListSize - 1) / 2;
+      EXPECT_EQ(nSum, supposed_sum);
+    }
+  }
 
+  template <typename Queue>
+  static void run_queue(Queue *q, size_t enqueue_count, size_t enqueue_stride) {
     size_t nNo = 0;
-    size_t push_failure = 0;
     size_t pop_sum = 0;
-               std::unique_ptr<Queue> q(new Queue());
     while (nNo < enqueue_count) {
-      size_t curr_push_count =
-          std::min(enqueue_count - nNo, kEnqueueStride);
+      size_t curr_push_count = std::min(enqueue_count - nNo, enqueue_stride);
       for (size_t i = 0; i < curr_push_count; i++) {
         q->enqueue(nNo++);
       }
@@ -50,29 +90,127 @@ void run_queue(size_t enqueue_count, const char* bench_name) {
         pop_sum += res;
       }
     }
+    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+    EXPECT_EQ(pop_sum, supposed_sum);
+  }
 
-    auto finish_time = std::chrono::system_clock::now();
-    auto dur = finish_time - start_time;
-    auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
+  template <typename Queue>
+  static void run_without_initial_capacity(size_t enqueue_count,
+                                           size_t enqueue_stride) {
+    std::unique_ptr<Queue> q(new Queue());
+    run_queue(q.get(), enqueue_count, enqueue_stride);
+  }
 
-    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
-    if (pop_sum != supposed_sum) {
-      std::cout << "Sequential queue pop sum: " << pop_sum
-                << " != " << supposed_sum << "\n";
-      std::cout << "[       FAILED ] " << kTestName << "." << bench_name
-                << " (" << milisecs.count() << " ms)" << std::endl;
-      assert(false && "Folly unbounded queue ERROR");
-    } else {
-        std::cout << "[       OK ] " << kTestName << "." << bench_name
-                  << " (" << milisecs.count() << " ms)" << std::endl;
+  template <typename Queue>
+  static void run_with_initial_capacity(size_t queue_capacity,
+                                        size_t enqueue_count,
+                                        size_t enqueue_stride) {
+    std::unique_ptr<Queue> q(new Queue(queue_capacity));
+    run_queue(q.get(), enqueue_count, enqueue_stride);
+  }
+};
+
+// MPMC Specialization.
+template <>
+void FollyQueueEnqueueDequeueTest_Sequential::run_queue(MPMCQueue *q,
+                                                        size_t enqueue_count,
+                                                        size_t enqueue_stride) {
+  size_t nNo = 0;
+  size_t push_sum = 0;
+  size_t pop_sum = 0;
+  while (nNo < enqueue_count) {
+    size_t curr_push_count = std::min(enqueue_count - nNo, enqueue_stride);
+    for (size_t i = 0; i < curr_push_count; i++) {
+      if (q->write(nNo)) {
+        push_sum += nNo;
+        nNo++;
+      }
     }
+    size_t res;
+    while (q->read(res)) {
+      pop_sum += res;
+    }
+  }
+
+  size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+  EXPECT_EQ(pop_sum, supposed_sum);
 }
 
-int main() {
-  run_queue<USPSCQueue>(kUSPSCQueueEnqueueCount, kUSPSCQueueBenchmarkName);
-  run_queue<UMPSCQueue>(kUMPSCQueueEnqueueCount, kUMPSCQueueBenchmarkName);
-  run_queue<USPMCQueue>(kUSPMCQueueEnqueueCount, kUSPMCQueueBenchmarkName);
-  run_queue<UMPMCQueue>(kUMPMCQueueEnqueueCount, kUMPMCQueueBenchmarkName);
+// Unbounded queue
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUnboundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUMPMCQueueEnqueueCount;
+// Dynamic bounded queue
+size_t FollyQueueEnqueueDequeueTest_Sequential::
+    s_nDynamicBoundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDynamicBoundedQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDMPMCQueueEnqueueCount;
+// AtomicLinkedList
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nAtomicLinkedListSize;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nAtomicLinkedListPassCount;
+// MPMC Queue (linearizable)
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueEnqueueCount;
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyMPMCQueue) {
+  run_with_initial_capacity<MPMCQueue>(s_nMPMCQueueCapacity,
+                                       s_nMPMCQueueEnqueueCount,
+                                       s_nMPMCQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyAtomicLinkedList) {
+  run_atomic_linkedlist();
+}
 
-  return 0;
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_SPSC) {
+  run_without_initial_capacity<USPSCQueue>(s_nUSPSCQueueEnqueueCount,
+                                           s_nUnboundedQueueEnqueueStride);
 }
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_MPSC) {
+  run_without_initial_capacity<UMPSCQueue>(s_nUMPSCQueueEnqueueCount,
+                                           s_nUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_SPMC) {
+  run_without_initial_capacity<USPMCQueue>(s_nUSPMCQueueEnqueueCount,
+                                           s_nUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_MPMC) {
+  run_without_initial_capacity<UMPMCQueue>(s_nUMPMCQueueEnqueueCount,
+                                           s_nUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_SPSC) {
+  // DynamicBoundedQueue
+  run_with_initial_capacity<DSPSCQueue>(s_nDynamicBoundedQueueCapacity,
+                                        s_nDSPSCQueueEnqueueCount,
+                                        s_nDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_MPSC) {
+  run_with_initial_capacity<DMPSCQueue>(s_nDynamicBoundedQueueCapacity,
+                                        s_nDMPSCQueueEnqueueCount,
+                                        s_nDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_SPMC) {
+  run_with_initial_capacity<DSPMCQueue>(s_nDynamicBoundedQueueCapacity,
+                                        s_nDSPMCQueueEnqueueCount,
+                                        s_nDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_MPMC) {
+  run_with_initial_capacity<DMPMCQueue>(s_nDynamicBoundedQueueCapacity,
+                                        s_nDMPMCQueueEnqueueCount,
+                                        s_nDynamicBoundedQueueEnqueueStride);
+}
+
+} // namespace folly_test