Uses cmake to build folly test drivers
[folly.git] / folly / stress-test / stress-parallel-folly-queue.cpp
diff --git a/folly/stress-test/stress-parallel-folly-queue.cpp b/folly/stress-test/stress-parallel-folly-queue.cpp
new file mode 100644 (file)
index 0000000..70bcb15
--- /dev/null
@@ -0,0 +1,196 @@
+#include <folly/concurrency/UnboundedQueue.h>
+#include <folly/concurrency/DynamicBoundedQueue.h>
+#include <folly/AtomicLinkedList.h>
+#include <folly/MPMCQueue.h>
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+namespace {
+
+// Unbounded queue
+size_t kUnboundedQueueEnqueueStride = 10000;
+size_t kUSPSCQueueEnqueueCount = 1200000000;
+size_t kUMPSCQueueEnqueueCount = 320000000;
+size_t kUSPMCQueueEnqueueCount = 320000000;
+size_t kUMPMCQueueEnqueueCount = 320000000;
+
+typedef folly::USPSCQueue<size_t, false> USPSCQueue;
+typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
+typedef folly::USPMCQueue<size_t, false> USPMCQueue;
+typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
+
+// Dynamic bounded queue
+size_t kDynamicBoundedQueueEnqueueStride = 50000;
+size_t kDynamicBoundedQueueCapacity = 200000;
+size_t kDSPSCQueueEnqueueCount = 1200000000;
+size_t kDMPSCQueueEnqueueCount = 320000000;
+size_t kDSPMCQueueEnqueueCount = 320000000;
+size_t kDMPMCQueueEnqueueCount = 320000000;
+
+typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
+typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
+typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
+typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
+
+// AtomicLinkedList
+size_t kAtomicLinkedListSize = 50000;
+size_t kAtomicLinkedListPassCount = 10000;
+typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
+
+// MPMC Queue (linearizable)
+size_t kMPMCQueueEnqueueStride = 10000;
+size_t kMPMCQueueCapacity = 50000;
+size_t kMPMCQueueEnqueueCount = 500000000;
+typedef folly::MPMCQueue<size_t> MPMCQueue;
+
+}
+
+void run_atomic_linkedlist() {
+  for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
+    std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
+    bool in_order = true;
+    for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
+      list->insertHead(i);
+    }
+    size_t nSum = 0;
+    auto func = [&nSum] (size_t elem) { nSum += elem; };
+    if (in_order) {
+      list->sweep(func);
+    } else {
+      list->reverseSweep(func);
+    }
+    in_order = !in_order;
+
+    size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
+    EXPECT_EQ(nSum, supposed_sum);
+  }
+}
+
+template <typename Queue>
+void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) {
+    size_t nNo = 0;
+    size_t pop_sum = 0;
+    while (nNo < enqueue_count) {
+      size_t curr_push_count =
+          std::min(enqueue_count - nNo, enqueue_stride);
+      for (size_t i = 0; i < curr_push_count; i++) {
+        q->enqueue(nNo++);
+      }
+      size_t res;
+      for (size_t i = 0; i < curr_push_count; i++) {
+        q->dequeue(res);
+        pop_sum += res;
+      }
+    }
+    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+    EXPECT_EQ (pop_sum, supposed_sum);
+}
+
+// MPMC Specialization.
+template <>
+void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) {
+    size_t nNo = 0;
+    size_t push_sum = 0;
+    size_t pop_sum = 0;
+    while (nNo < enqueue_count) {
+      size_t curr_push_count =
+          std::min(enqueue_count - nNo, enqueue_stride);
+      for (size_t i = 0; i < curr_push_count; i++) {
+        if (q->write(nNo)) {
+          push_sum += nNo;
+          nNo++;
+        }
+      }
+      size_t res;
+      while (q->read(res)) {
+        pop_sum += res;
+      }
+    }
+
+    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+    EXPECT_EQ(pop_sum, supposed_sum);
+}
+
+template <typename Queue>
+void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) {
+  std::unique_ptr<Queue> q(new Queue());
+  run_queue(q.get(), enqueue_count, enqueue_stride);
+}
+
+template <typename Queue>
+void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
+                               size_t enqueue_stride) {
+  std::unique_ptr<Queue> q(new Queue(queue_capacity));
+  run_queue(q.get(), enqueue_count, enqueue_stride);
+}
+
+class FollyQueueEnqueueDequeueTest : public ::testing::Test {
+
+};
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) {
+  run_with_initial_capacity<MPMCQueue>(
+      kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) {
+  run_atomic_linkedlist();
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) {
+  run_without_initial_capacity<USPSCQueue>(
+      kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) {
+  run_without_initial_capacity<UMPSCQueue>(
+      kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) {
+  run_without_initial_capacity<USPMCQueue>(
+      kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) {
+  run_without_initial_capacity<UMPMCQueue>(
+      kUMPMCQueueEnqueueCount,
+      kUnboundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) {
+  // DynamicBoundedQueue
+  run_with_initial_capacity<DSPSCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) {
+  run_with_initial_capacity<DMPSCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDMPSCQueueEnqueueCount,
+      kDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) {
+  run_with_initial_capacity<DSPMCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDSPMCQueueEnqueueCount,
+      kDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) {
+  run_with_initial_capacity<DMPMCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDMPMCQueueEnqueueCount,
+      kDynamicBoundedQueueEnqueueStride);
+}
+
+int main(int argc, char** argv) {
+  // Init Google test
+  ::testing::InitGoogleTest(&argc, argv);
+  int result = RUN_ALL_TESTS();
+  return result;
+}