-#include <folly/concurrency/UnboundedQueue.h>
-#include <folly/concurrency/DynamicBoundedQueue.h>
-#include <folly/AtomicLinkedList.h>
-#include <folly/MPMCQueue.h>
+#include "queue_test.h"
+
+namespace folly_test {
+
+class FollyQueueEnqueueDequeueTest_Sequential
+ : public cds_test::stress_fixture {
+protected:
+ // Unbounded queue
+ static size_t s_nUnboundedQueueEnqueueStride;
+ static size_t s_nUSPSCQueueEnqueueCount;
+ static size_t s_nUMPSCQueueEnqueueCount;
+ static size_t s_nUSPMCQueueEnqueueCount;
+ static size_t s_nUMPMCQueueEnqueueCount;
+
+ // Dynamic bounded queue
+ static size_t s_nDynamicBoundedQueueEnqueueStride;
+ static size_t s_nDynamicBoundedQueueCapacity;
+ static size_t s_nDSPSCQueueEnqueueCount;
+ static size_t s_nDMPSCQueueEnqueueCount;
+ static size_t s_nDSPMCQueueEnqueueCount;
+ static size_t s_nDMPMCQueueEnqueueCount;
-#include <chrono>
-#include <cassert>
-#include <iostream>
-#include <memory>
-
-namespace {
-
-const char* kTestName = "EnqueueDequeue";
-
-// Unbounded queue
-size_t kUnboundedQueueEnqueueStride = 10000;
-size_t kUSPSCQueueEnqueueCount = 1200000000;
-const char* kUSPSCQueueBenchmarkName = "FollyUnboundedQueue_SPSC";
-size_t kUMPSCQueueEnqueueCount = 320000000;
-const char* kUMPSCQueueBenchmarkName = "FollyUnboundedQueue_MPSC";
-size_t kUSPMCQueueEnqueueCount = 320000000;
-const char* kUSPMCQueueBenchmarkName = "FollyUnboundedQueue_SPMC";
-size_t kUMPMCQueueEnqueueCount = 320000000;
-const char* kUMPMCQueueBenchmarkName = "FollyUnboundedQueue_MPMC";
-
-typedef folly::USPSCQueue<size_t, false> USPSCQueue;
-typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
-typedef folly::USPMCQueue<size_t, false> USPMCQueue;
-typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
-
-// Dynamic bounded queue
-size_t kDynamicBoundedQueueEnqueueStride = 50000;
-size_t kDynamicBoundedQueueCapacity = 200000;
-size_t kDSPSCQueueEnqueueCount = 1200000000;
-const char* kDSPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPSC";
-size_t kDMPSCQueueEnqueueCount = 320000000;
-const char* kDMPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPSC";
-size_t kDSPMCQueueEnqueueCount = 320000000;
-const char* kDSPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPMC";
-size_t kDMPMCQueueEnqueueCount = 320000000;
-const char* kDMPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPMC";
-
-typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
-typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
-typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
-typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
-
-// AtomicLinkedList
-size_t kAtomicLinkedListSize = 50000;
-size_t kAtomicLinkedListPassCount = 10000;
-const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
-typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
-
-// MPMC Queue (linearizable)
-size_t kMPMCQueueEnqueueStride = 10000;
-size_t kMPMCQueueCapacity = 50000;
-size_t kMPMCQueueEnqueueCount = 500000000;
-const char* kMPMCQueueBenchmarkName = "FollyMPMCQueue";
-typedef folly::MPMCQueue<size_t> MPMCQueue;
+ // AtomicLinkedList
+ static size_t s_nAtomicLinkedListSize;
+ static size_t s_nAtomicLinkedListPassCount;
+
+ // MPMC Queue (linearizable)
+ static size_t s_nMPMCQueueEnqueueStride;
+ static size_t s_nMPMCQueueCapacity;
+ static size_t s_nMPMCQueueEnqueueCount;
+
+ static void SetUpTestCase() {
+ const cds_test::config &cfg = get_config("SequentialFollyQueue");
+ // Unbounded queue
+ GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
+ GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
+ GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
+ // Dynamic bounded queue
+ GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
+ GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
+ GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
+ GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
+ // AtomicLinkedList
+ GetConfigNonZeroExpected(AtomicLinkedListSize, 50000);
+ GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
+ // MPMC Queue (linearizable)
+ GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
+ GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
+ GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
+ }
-}
+ static void run_atomic_linkedlist() {
+ for (size_t pass = 0; pass < s_nAtomicLinkedListPassCount; pass++) {
+ std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
+ bool in_order = true;
+ for (size_t i = 0; i < s_nAtomicLinkedListSize; i++) {
+ list->insertHead(i);
+ }
+ size_t nSum = 0;
+ auto func = [&nSum](size_t elem) { nSum += elem; };
+ if (in_order) {
+ list->sweep(func);
+ } else {
+ list->reverseSweep(func);
+ }
+ in_order = !in_order;
-void run_atomic_linkedlist() {
- std::cout << "[ RUN ] " << kTestName << "."
- << kAtomicLinkedListBenchmarkName << std::endl;
- auto start_time = std::chrono::system_clock::now();
- for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
- std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
- bool in_order = true;
- for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
- list->insertHead(i);
- }
- size_t nSum = 0;
- auto func = [&nSum] (size_t elem) { nSum += elem; };
- if (in_order) {
- list->sweep(func);
- } else {
- list->reverseSweep(func);
- }
- in_order = !in_order;
-
- size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
- if (nSum != supposed_sum) {
- std::cout << "Sequential linked list pop sum: " << nSum
- << " != " << supposed_sum << "\n";
- auto finish_time = std::chrono::system_clock::now();
- auto dur = finish_time - start_time;
- auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
- std::cout << "[ FAILED ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
- << " (" << milisecs.count() << " ms)" << std::endl;
- assert(false && "Folly AtomicLinkedList ERROR");
+ size_t supposed_sum =
+ s_nAtomicLinkedListSize * (s_nAtomicLinkedListSize - 1) / 2;
+ EXPECT_EQ(nSum, supposed_sum);
}
}
- auto finish_time = std::chrono::system_clock::now();
- auto dur = finish_time - start_time;
- auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
- std::cout << "[ OK ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
- << " (" << milisecs.count() << " ms)" << std::endl;
-}
-
-template <typename Queue>
-void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
- size_t enqueue_stride) {
- std::cout << "[ RUN ] " << kTestName << "." << bench_name << std::endl;
- auto start_time = std::chrono::system_clock::now();
+ template <typename Queue>
+ static void run_queue(Queue *q, size_t enqueue_count, size_t enqueue_stride) {
size_t nNo = 0;
size_t pop_sum = 0;
while (nNo < enqueue_count) {
- size_t curr_push_count =
- std::min(enqueue_count - nNo, enqueue_stride);
+ size_t curr_push_count = std::min(enqueue_count - nNo, enqueue_stride);
for (size_t i = 0; i < curr_push_count; i++) {
q->enqueue(nNo++);
}
pop_sum += res;
}
}
+ size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+ EXPECT_EQ(pop_sum, supposed_sum);
+ }
- auto finish_time = std::chrono::system_clock::now();
- auto dur = finish_time - start_time;
- auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
+ template <typename Queue>
+ static void run_without_initial_capacity(size_t enqueue_count,
+ size_t enqueue_stride) {
+ std::unique_ptr<Queue> q(new Queue());
+ run_queue(q.get(), enqueue_count, enqueue_stride);
+ }
- size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
- if (pop_sum != supposed_sum) {
- std::cout << "Sequential queue pop sum: " << pop_sum
- << " != " << supposed_sum << "\n";
- std::cout << "[ FAILED ] " << kTestName << "." << bench_name
- << " (" << milisecs.count() << " ms)" << std::endl;
- assert(false && "Folly concurrent queue ERROR");
- } else {
- std::cout << "[ OK ] " << kTestName << "." << bench_name
- << " (" << milisecs.count() << " ms)" << std::endl;
- }
-}
+ template <typename Queue>
+ static void run_with_initial_capacity(size_t queue_capacity,
+ size_t enqueue_count,
+ size_t enqueue_stride) {
+ std::unique_ptr<Queue> q(new Queue(queue_capacity));
+ run_queue(q.get(), enqueue_count, enqueue_stride);
+ }
+};
// MPMC Specialization.
template <>
-void run_queue(MPMCQueue* q, size_t enqueue_count, const char* bench_name,
- size_t enqueue_stride) {
- std::cout << "[ RUN ] " << kTestName << "." << bench_name << std::endl;
- auto start_time = std::chrono::system_clock::now();
-
- size_t nNo = 0;
- size_t push_sum = 0;
- size_t pop_sum = 0;
- while (nNo < enqueue_count) {
- size_t curr_push_count =
- std::min(enqueue_count - nNo, enqueue_stride);
- for (size_t i = 0; i < curr_push_count; i++) {
- if (q->write(nNo)) {
- push_sum += nNo;
- nNo++;
- }
- }
- size_t res;
- while (q->read(res)) {
- pop_sum += res;
+void FollyQueueEnqueueDequeueTest_Sequential::run_queue(MPMCQueue *q,
+ size_t enqueue_count,
+ size_t enqueue_stride) {
+ size_t nNo = 0;
+ size_t push_sum = 0;
+ size_t pop_sum = 0;
+ while (nNo < enqueue_count) {
+ size_t curr_push_count = std::min(enqueue_count - nNo, enqueue_stride);
+ for (size_t i = 0; i < curr_push_count; i++) {
+ if (q->write(nNo)) {
+ push_sum += nNo;
+ nNo++;
}
}
+ size_t res;
+ while (q->read(res)) {
+ pop_sum += res;
+ }
+ }
- auto finish_time = std::chrono::system_clock::now();
- auto dur = finish_time - start_time;
- auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
+ size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+ EXPECT_EQ(pop_sum, supposed_sum);
+}
- size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
- if (pop_sum != supposed_sum) {
- std::cout << "Sequential queue pop sum: " << pop_sum
- << " != " << supposed_sum << "\n";
- std::cout << "[ FAILED ] " << kTestName << "." << bench_name
- << " (" << milisecs.count() << " ms)" << std::endl;
- assert(false && "Folly concurrent queue ERROR");
- } else {
- std::cout << "[ OK ] " << kTestName << "." << bench_name
- << " (" << milisecs.count() << " ms)" << std::endl;
- }
+// Unbounded queue
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUnboundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nUMPMCQueueEnqueueCount;
+// Dynamic bounded queue
+size_t FollyQueueEnqueueDequeueTest_Sequential::
+ s_nDynamicBoundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDynamicBoundedQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nDMPMCQueueEnqueueCount;
+// AtomicLinkedList
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nAtomicLinkedListSize;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nAtomicLinkedListPassCount;
+// MPMC Queue (linearizable)
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Sequential::s_nMPMCQueueEnqueueCount;
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyMPMCQueue) {
+ run_with_initial_capacity<MPMCQueue>(s_nMPMCQueueCapacity,
+ s_nMPMCQueueEnqueueCount,
+ s_nMPMCQueueEnqueueStride);
}
-template <typename Queue>
-void run_without_initial_capacity(size_t enqueue_count, const char* bench_name,
- size_t enqueue_stride) {
- std::unique_ptr<Queue> q(new Queue());
- run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyAtomicLinkedList) {
+ run_atomic_linkedlist();
}
-template <typename Queue>
-void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
- const char* bench_name, size_t enqueue_stride) {
- std::unique_ptr<Queue> q(new Queue(queue_capacity));
- run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_SPSC) {
+ run_without_initial_capacity<USPSCQueue>(s_nUSPSCQueueEnqueueCount,
+ s_nUnboundedQueueEnqueueStride);
}
-int main() {
- // MPMCQueue
- run_with_initial_capacity<MPMCQueue>(
- kMPMCQueueCapacity ,
- kMPMCQueueEnqueueCount,
- kMPMCQueueBenchmarkName,
- kMPMCQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_MPSC) {
+ run_without_initial_capacity<UMPSCQueue>(s_nUMPSCQueueEnqueueCount,
+ s_nUnboundedQueueEnqueueStride);
+}
- // AtomicLinkedList
- run_atomic_linkedlist();
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_SPMC) {
+ run_without_initial_capacity<USPMCQueue>(s_nUSPMCQueueEnqueueCount,
+ s_nUnboundedQueueEnqueueStride);
+}
- // UnboundedQueue
- run_without_initial_capacity<USPSCQueue>(
- kUSPSCQueueEnqueueCount,
- kUSPSCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_without_initial_capacity<UMPSCQueue>(
- kUMPSCQueueEnqueueCount,
- kUMPSCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_without_initial_capacity<USPMCQueue>(
- kUSPMCQueueEnqueueCount,
- kUSPMCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_without_initial_capacity<UMPMCQueue>(
- kUMPMCQueueEnqueueCount,
- kUMPMCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyUnboundedQueue_MPMC) {
+ run_without_initial_capacity<UMPMCQueue>(s_nUMPMCQueueEnqueueCount,
+ s_nUnboundedQueueEnqueueStride);
+}
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_SPSC) {
// DynamicBoundedQueue
- run_with_initial_capacity<DSPSCQueue>(
- kDynamicBoundedQueueCapacity ,
- kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_with_initial_capacity<DMPSCQueue>(
- kDynamicBoundedQueueCapacity,
- kDMPSCQueueEnqueueCount,
- kDMPSCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_with_initial_capacity<DSPMCQueue>(
- kDynamicBoundedQueueCapacity,
- kDSPMCQueueEnqueueCount,
- kDSPMCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_with_initial_capacity<DMPMCQueue>(
- kDynamicBoundedQueueCapacity,
- kDMPMCQueueEnqueueCount,
- kDMPMCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
-
- return 0;
+ run_with_initial_capacity<DSPSCQueue>(s_nDynamicBoundedQueueCapacity,
+ s_nDSPSCQueueEnqueueCount,
+ s_nDynamicBoundedQueueEnqueueStride);
}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_MPSC) {
+ run_with_initial_capacity<DMPSCQueue>(s_nDynamicBoundedQueueCapacity,
+ s_nDMPSCQueueEnqueueCount,
+ s_nDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_SPMC) {
+ run_with_initial_capacity<DSPMCQueue>(s_nDynamicBoundedQueueCapacity,
+ s_nDSPMCQueueEnqueueCount,
+ s_nDynamicBoundedQueueEnqueueStride);
+}
+
+TEST_F(FollyQueueEnqueueDequeueTest_Sequential, FollyDynamicBoundedQueue_MPMC) {
+ run_with_initial_capacity<DMPMCQueue>(s_nDynamicBoundedQueueCapacity,
+ s_nDMPMCQueueEnqueueCount,
+ s_nDynamicBoundedQueueEnqueueStride);
+}
+
+} // namespace folly_test