#include "common.h"
-#include <cds/container/chase-lev-deque.h>
+#include <cds/misc/chase-lev-deque.h>
#include <cds_test/stress_test.h>
#include <cstdlib>
#include <ctime>
#include <iostream>
+#include <memory>
#include <thread>
using namespace std;
namespace {
typedef cds_others::ChaseLevDeque Deque;
-static size_t s_nDequeStealerThreadCount = 5;
+static size_t s_nDequePushPercentage = 70;
+static size_t s_nDequeStealerThreadCount = 3;
static size_t s_nDequeMainPassCount = 100000000;
+static size_t s_nInitialDequePushPassCount = 10000;
class ChaseLevDequeTest : public cds_test::stress_fixture {
protected:
static Deque *deque;
- static atomic_int terminate_stealer;
+ static atomic_bool pusher_done;
static ullong *sums;
static ullong *succ_counts;
static ullong push_sum;
static void SetUpTestCase() {
cds_test::config const &cfg = get_config("Misc");
+ GetConfig(DequePushPercentage);
GetConfig(DequeStealerThreadCount);
GetConfig(DequeMainPassCount);
+ GetConfig(InitialDequePushPassCount);
}
static void StealerThread(int index) {
- while (!terminate_stealer.load(memory_order_relaxed)) {
- int res = deque->steal();
- if (res != EMPTY && res != ABORT) {
- sums[index] += res;
- succ_counts[index]++;
+ while (!pusher_done.load(memory_order_acquire)) {
+ while (true) {
+ int res = deque->steal();
+ if (res != EMPTY && res != ABORT) {
+ sums[index] += res;
+ succ_counts[index]++;
+ } else {
+ break;
+ }
}
}
}
static void MainThread(int index, int push_percentage) {
- for (ullong i = 0; i < s_nDequeMainPassCount; i++) {
- if ((::rand() % 100) < push_percentage) {
- int item = ::rand() % 100;
+ for (size_t i = 0; i < s_nInitialDequePushPassCount; i++) {
+ deque->push(i + 1);
+ push_sum += i + 1;
+ push_count++;
+ }
+ for (size_t i = 0; i < s_nDequeMainPassCount; i++) {
+ if (rand(100) < push_percentage) {
+ int item = rand(s_nDequeMainPassCount);
+ if (item == EMPTY || item == ABORT) {
+ item = 1;
+ }
deque->push(item);
push_sum += item;
push_count++;
}
}
}
- // while (true) {
- // int res = deque->take();
- // if (res != EMPTY) {
- // sums[index] += res;
- // succ_counts[index]++;
- // } else {
- // break;
- // }
- // }
+ // Try take() until we don't can't take any more.
+ while (true) {
+ int res = deque->take();
+ if (res != EMPTY) {
+ sums[index] += res;
+ succ_counts[index]++;
+ } else {
+ break;
+ }
+ }
}
};
-atomic_int ChaseLevDequeTest::terminate_stealer;
+atomic_bool ChaseLevDequeTest::pusher_done;
ullong *ChaseLevDequeTest::sums;
ullong *ChaseLevDequeTest::succ_counts;
ullong ChaseLevDequeTest::push_count;
sums = (ullong *)calloc(1, sizeof(ullong) * (s_nDequeStealerThreadCount + 1));
succ_counts =
(ullong *)calloc(1, sizeof(ullong) * (s_nDequeStealerThreadCount + 1));
- srand(time(NULL));
// Stealer threads
- std::thread *threads = new std::thread[s_nDequeStealerThreadCount];
+ std::unique_ptr<std::thread[]> threads(
+ new std::thread[s_nDequeStealerThreadCount]);
for (ullong i = 0; i < s_nDequeStealerThreadCount; i++) {
threads[i] = std::thread(StealerThread, i);
}
- for (int i = 90; i > 0; i -= 10) {
- MainThread(s_nDequeStealerThreadCount, i);
- }
+ MainThread(s_nDequeStealerThreadCount, s_nDequePushPercentage);
+ pusher_done.store(true, memory_order_release);
- terminate_stealer.store(1, memory_order_relaxed);
for (ullong i = 0; i < s_nDequeStealerThreadCount; i++) {
threads[i].join();
}
received_sum += sums[i];
overall_count += succ_counts[i];
}
- cout << "Sum of push: " << push_sum << "\n";
- cout << "Received sum:" << received_sum << "\n";
- cout << "overall_count:" << overall_count << "\n";
- cout << "push_count=" << push_count << "\n";
+ if (overall_count != push_count || received_sum != push_sum) {
+ cout << "Incorrect deque\n";
+ cout << "Push sum: " << push_sum << "\n";
+ cout << "Received sum:" << received_sum << "\n";
+ cout << "Push count=" << push_count << "\n";
+ cout << "Received count:" << overall_count << "\n";
+ }
+
+ free(sums);
+ free(succ_counts);
+ delete deque;
}
} // namespace