From: Peizhao Ou Date: Fri, 8 Dec 2017 01:18:58 +0000 (-0800) Subject: Merge branch 'master' of /scratch/mine/libcds X-Git-Url: http://plrg.eecs.uci.edu/git/?p=libcds.git;a=commitdiff_plain;h=df58fae11f80f03ca5b50c800f2978b1ff08efea;hp=05d6a6a58c740cf12f2580473724572a77d33b81 Merge branch 'master' of /scratch/mine/libcds --- diff --git a/cds/sync/backoff.h b/cds/sync/backoff.h new file mode 100644 index 00000000..bcbcec50 --- /dev/null +++ b/cds/sync/backoff.h @@ -0,0 +1,17 @@ +#ifndef _BACKOFF_H +#define _BACKOFF_H + +#include + +namespace cds_others { + +namespace bkoff = cds::backoff; +struct BackoffTraits : public bkoff::exponential_const_traits { + static size_t lower_bound; + static size_t upper_bound; +}; +typedef bkoff::exponential ExpBackoff; + +} // namespace cds_others + +#endif diff --git a/cds/sync/barrier.h b/cds/sync/barrier.h new file mode 100644 index 00000000..6d0366c7 --- /dev/null +++ b/cds/sync/barrier.h @@ -0,0 +1,58 @@ +#ifndef _BARRIER_H +#define _BARRIER_H + +#include "backoff.h" +#include + +namespace cds_others { + +class SpinBarrier { +public: + SpinBarrier(unsigned int n) : n_(n) { + nwait_ = 0; + step_ = 0; + } + + // The purpose of wait() is that threads that enter it synchronize with + // threads when they get out of it. + /** wildcard(2) is acq_rel, ensuring that all threads hb before other + * threads in the rmw chain order, then the wildcard (4) and (5) are + * release/acquire to make sure the last thread synchronize with all other + * earlier threads. Plus, the (4) and (5) synchronization can make sure the + * reset of nwait_ in wildcard(3) happens-before any other threads in the + * later usage of the barrier. + */ + + bool wait() { + unsigned int step = step_.load(std::memory_order_relaxed); + + if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) { + /* OK, last thread to come. */ + nwait_.store(0, std::memory_order_relaxed); + step_.fetch_add(1, std::memory_order_release); + return true; + } else { + ExpBackoff backoff; + /* Run in circles and scream like a little girl. */ + while (step_.load(std::memory_order_acquire) == step) { + backoff(); + } + return false; + } + } + +protected: + /* Number of synchronized threads. */ + const unsigned int n_; + + /* Number of threads currently spinning. */ + std::atomic nwait_; + + /* Number of barrier syncronizations completed so far, + * * it's OK to wrap. */ + std::atomic step_; +}; + +} // namespace cds_others + +#endif diff --git a/cds/sync/mcs-lock.h b/cds/sync/mcs-lock.h new file mode 100644 index 00000000..d5cda3a6 --- /dev/null +++ b/cds/sync/mcs-lock.h @@ -0,0 +1,108 @@ +#ifndef _MCS_LOCK_H +#define _MCS_LOCK_H + +#include "backoff.h" +#include +#include +#include + +namespace cds_others { + +size_t BackoffTraits::lower_bound = 16; +size_t BackoffTraits::upper_bound = 1024; + +// Forward declaration +struct mcs_node; +struct mcs_mutex; + +struct mcs_node { + std::atomic next; + std::atomic gate; + + mcs_node() { + next.store(0); + gate.store(0); + } +}; + +struct mcs_mutex { +public: + // tail is null when lock is not held + std::atomic m_tail; + + mcs_mutex() { m_tail.store(nullptr); } + ~mcs_mutex() { assert(m_tail.load() == nullptr); } + + class guard { + public: + mcs_mutex *m_t; + mcs_node m_node; // node held on the stack + + // Call the wrapper (instrument every lock/unlock) + guard(mcs_mutex *t) : m_t(t) { t->lock(this); } + ~guard() { m_t->unlock(this); } + }; + + void lock(guard *I) { + mcs_node *me = &(I->m_node); + + // set up my node : + // not published yet so relaxed : + me->next.store(nullptr, std::memory_order_relaxed); + me->gate.store(1, std::memory_order_relaxed); + + // publish my node as the new tail : + mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel); + if (pred != nullptr) { + // (*1) race here + // unlock of pred can see me in the tail before I fill next + + // If this is relaxed, the store 0 to gate will be read before and + // that lock will never ends. + // publish me to previous lock-holder : + pred->next.store(me, std::memory_order_release); + + // (*2) pred not touched any more + + // now this is the spin - + // wait on predecessor setting my flag - + ExpBackoff backoff; + while (me->gate.load(std::memory_order_acquire)) { + backoff(); + } + } + } + + void unlock(guard *I) { + mcs_node *me = &(I->m_node); + mcs_node *next = me->next.load(std::memory_order_acquire); + if (next == nullptr) { + mcs_node *tail_was_me = me; + + // This was mo_acq_rel, which is stronger than necessary + if (m_tail.compare_exchange_strong(tail_was_me, nullptr, + std::memory_order_release)) { + // got null in tail, mutex is unlocked + return; + } + + // (*1) catch the race : + ExpBackoff backoff; + for (;;) { + next = me->next.load(std::memory_order_acquire); + if (next != nullptr) + break; + backoff(); + } + } + + // (*2) - store to next must be done, + // so no locker can be viewing my node any more + + next->gate.store(0, std::memory_order_release); + } +}; + +} // namespace cds_others + +#endif diff --git a/cds/sync/rwlock.h b/cds/sync/rwlock.h new file mode 100644 index 00000000..e8c6465e --- /dev/null +++ b/cds/sync/rwlock.h @@ -0,0 +1,93 @@ +#ifndef _RWLOCK_H +#define _RWLOCK_H + +#include "backoff.h" +#include +#include +#include + +namespace cds_others { + +#define RW_LOCK_BIAS 0x00100000 +#define WRITE_LOCK_CMP RW_LOCK_BIAS + +using std::memory_order_acquire; +using std::memory_order_release; +using std::memory_order_relaxed; +using std::atomic_int; + +class RWLock { +public: + RWLock() { + lock.store(RW_LOCK_BIAS); + } + + int read_can_lock() { + return atomic_load_explicit(&lock, memory_order_relaxed) > 0; + } + + int write_can_lock() { + return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS; + } + + void read_lock() { + ExpBackoff backoff; + int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + while (priorvalue <= 0) { + atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); + while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) { + backoff(); + } + priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + } + } + + void write_lock() { + int priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + ExpBackoff backoff; + while (priorvalue != RW_LOCK_BIAS) { + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); + while (atomic_load_explicit(&lock, memory_order_relaxed) != + RW_LOCK_BIAS) { + backoff(); + } + priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + } + } + + int read_trylock() { + int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + if (priorvalue > 0) + return 1; + + atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); + return 0; + } + + int write_trylock() { + int priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + if (priorvalue == RW_LOCK_BIAS) + return 1; + + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); + return 0; + } + + void read_unlock() { + atomic_fetch_add_explicit(&lock, 1, memory_order_release); + } + + void write_unlock() { + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release); + } + +private: + atomic_int lock; +}; + +} // namespace cds_others + +#endif diff --git a/cds/sync/seqlock.h b/cds/sync/seqlock.h new file mode 100644 index 00000000..a84c0c43 --- /dev/null +++ b/cds/sync/seqlock.h @@ -0,0 +1,62 @@ +#ifndef _SEQLOCK_H +#define _SEQLOCK_H + +#include + +namespace cds_others { + +using std::atomic_int; +using std::memory_order_release; +using std::memory_order_acquire; +using std::memory_order_relaxed; + +class SeqLock { +private: + // Sequence for reader consistency check. + atomic_int seq_; + // It needs to be atomic to avoid data races + atomic_int data_; + +public: + SeqLock() { + atomic_init(&seq_, 0); + atomic_init(&data_, 0); + } + + int read() { + while (true) { + int old_seq = seq_.load(memory_order_acquire); // acquire + if (old_seq % 2 == 1) + continue; + + int res = data_.load(memory_order_acquire); // acquire + if (seq_.load(memory_order_relaxed) == old_seq) { // relaxed + return res; + } + } + } + + void write(int new_data) { + while (true) { + // This might be a relaxed too + int old_seq = seq_.load(memory_order_acquire); // acquire + if (old_seq % 2 == 1) + continue; // Retry + + // Should be relaxed!!! + if (seq_.compare_exchange_strong(old_seq, old_seq + 1, + memory_order_relaxed, + memory_order_relaxed)) // relaxed + break; + } + + // Update the data + data_.store(new_data, memory_order_release); // release + + seq_.fetch_add(1, memory_order_release); // release + } +}; + +} // namespace cds_others + +#endif diff --git a/test/stress/CMakeLists.txt b/test/stress/CMakeLists.txt index f269711a..b7336370 100644 --- a/test/stress/CMakeLists.txt +++ b/test/stress/CMakeLists.txt @@ -21,6 +21,7 @@ include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/misc) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/freelist) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/map) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/pqueue) @@ -30,6 +31,7 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/stack) add_custom_target( stress-all DEPENDS + stress-misc stress-freelist stress-map stress-pqueue diff --git a/test/stress/misc/CMakeLists.txt b/test/stress/misc/CMakeLists.txt new file mode 100644 index 00000000..75a81785 --- /dev/null +++ b/test/stress/misc/CMakeLists.txt @@ -0,0 +1,19 @@ +set(PACKAGE_NAME stress-misc) + +set(CDSSTRESS_STACK_SOURCES + ../main.cpp + barrier_driver.cpp + seqlock_driver.cpp + rwlock_driver.cpp + mcslock_driver.cpp + spinlock_driver.cpp +) + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR} +) + +add_executable(${PACKAGE_NAME} ${CDSSTRESS_STACK_SOURCES}) +target_link_libraries(${PACKAGE_NAME} ${CDS_TEST_LIBRARIES} ${CDSSTRESS_FRAMEWORK_LIBRARY}) + +add_test(NAME ${PACKAGE_NAME} COMMAND ${PACKAGE_NAME} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH}) diff --git a/test/stress/misc/barrier_driver.cpp b/test/stress/misc/barrier_driver.cpp new file mode 100644 index 00000000..e21f9f7a --- /dev/null +++ b/test/stress/misc/barrier_driver.cpp @@ -0,0 +1,56 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +typedef cds_others::SpinBarrier Barrier; + +class BarrierTest : public cds_test::stress_fixture { +protected: + static Barrier *barrier; + static int count; + static const int kThreads = 6; + static const int kPassCount = 10000; + + static void SetUpTestCase() {} + + static void TearDownTestCase() { + assert (count == kPassCount*kPassCount); + } + + static void Thread() { + for (int i = 0; i < kPassCount; i++) { + for (int j = 0; j < kPassCount; j++) { + if (barrier->wait()) { + count++; + } + } + } + } +}; + +Barrier *BarrierTest::barrier; +int BarrierTest::count; +const int BarrierTest::kThreads; + +TEST_F(BarrierTest, Wait) { + barrier = new Barrier(kThreads); + int num_threads = kThreads; + std::thread *threads = new std::thread[num_threads]; + for (int i = 0; i < num_threads; i++) { + threads[i] = std::thread(Thread); + } + + for (int i = 0; i < num_threads; i++) { + threads[i].join(); + } +} + +} // namespace diff --git a/test/stress/misc/mcslock_driver.cpp b/test/stress/misc/mcslock_driver.cpp new file mode 100644 index 00000000..e2b6de9a --- /dev/null +++ b/test/stress/misc/mcslock_driver.cpp @@ -0,0 +1,52 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +class MCSLockTest : public cds_test::stress_fixture { +protected: + static int x; + static cds_others::mcs_mutex *my_mutex; + static const int kThreads = 6; + + static void SetUpTestCase() {} + + static void Thread() { + cds_others::mcs_mutex::guard g(my_mutex); + x = 1; + my_mutex->unlock(&g); + for (int i = 0; i < 10000; i++) { + for (int j = 0; j < 300; j++) { + my_mutex->lock(&g); + x = i + j; + my_mutex->unlock(&g); + } + } + my_mutex->lock(&g); + } +}; + +int MCSLockTest::x; +cds_others::mcs_mutex *MCSLockTest::my_mutex; +const int MCSLockTest::kThreads; + +TEST_F(MCSLockTest, BasicLockUnlock) { + my_mutex = new cds_others::mcs_mutex(); + std::thread threads[kThreads]; + for (int i = 0; i < kThreads; i++) { + threads[i] = std::thread(Thread); + } + + for (int i = 0; i < kThreads; i++) { + threads[i].join(); + } +} + +} // namespace diff --git a/test/stress/misc/rwlock_driver.cpp b/test/stress/misc/rwlock_driver.cpp new file mode 100644 index 00000000..d2dfce90 --- /dev/null +++ b/test/stress/misc/rwlock_driver.cpp @@ -0,0 +1,121 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +typedef cds_others::RWLock RWLock; +class RWLockTest : public cds_test::stress_fixture { +protected: + static int sum; + static int x; + static RWLock *rwlock; + static const int kReaderThreads = 0; + static const int kWriterThreads = 0; + static const int kReaderWriterThreads = 6; + static const int kWriterPercentage = 20; + static const int kRWPassCount = 20000; + + static void SetUpTestCase() {} + + static void ReaderThread() { + for (int i = 0; i < 10000; i++) { + for (int j = 0; j < 10; i++) { + if (rwlock->read_can_lock()) { + if (!rwlock->read_trylock()) { + rwlock->read_lock(); + } + sum += x; + rwlock->read_unlock(); + } else { + rwlock->read_lock(); + sum += x; + rwlock->read_unlock(); + } + } + } + } + + static void WriterThread() { + for (int i = 0; i < 10000; i++) { + if (rwlock->write_can_lock()) { + if (!rwlock->write_trylock()) { + rwlock->write_lock(); + } + x += 1; + rwlock->write_unlock(); + } else { + rwlock->write_lock(); + x += 1; + rwlock->write_unlock(); + } + } + } + + static void ReaderWriterThread() { + for (int i = 0; i < kRWPassCount; i++) { + for (int j = 0; j < kRWPassCount; j++) { + if (rand(100) < kWriterPercentage) { + if (rwlock->read_can_lock()) { + if (!rwlock->read_trylock()) { + rwlock->read_lock(); + } + sum += x; + rwlock->read_unlock(); + } else { + rwlock->read_lock(); + sum += x; + rwlock->read_unlock(); + } + } else { + if (rwlock->write_can_lock()) { + if (!rwlock->write_trylock()) { + rwlock->write_lock(); + } + x += 1; + rwlock->write_unlock(); + } else { + rwlock->write_lock(); + x += 1; + rwlock->write_unlock(); + } + } + } + } + } +}; + +int RWLockTest::x; +int RWLockTest::sum; +RWLock *RWLockTest::rwlock; +const int RWLockTest::kReaderThreads; +const int RWLockTest::kWriterThreads; +const int RWLockTest::kReaderWriterThreads; +const int RWLockTest::kRWPassCount; + +TEST_F(RWLockTest, BasicLockUnlock) { + rwlock = new RWLock(); + int num_threads = kReaderThreads + kWriterThreads + kReaderWriterThreads; + std::thread *threads = new std::thread[num_threads]; + for (int i = 0; i < kReaderThreads; i++) { + threads[i] = std::thread(ReaderThread); + } + for (int i = kReaderThreads; i < (kReaderThreads + kWriterThreads); i++) { + threads[i] = std::thread(WriterThread); + } + for (int i = (kReaderThreads + kWriterThreads); i < num_threads; i++) { + threads[i] = std::thread(ReaderWriterThread); + } + + for (int i = 0; i < num_threads; i++) { + threads[i].join(); + } +} + +} // namespace diff --git a/test/stress/misc/rwqueue_driver.cpp b/test/stress/misc/rwqueue_driver.cpp new file mode 100644 index 00000000..4acb50e3 --- /dev/null +++ b/test/stress/misc/rwqueue_driver.cpp @@ -0,0 +1,83 @@ +#include +#include +#include +#include +#include + +using namespace std; + +cds::container::RWQueue queue; + +void InitQueue() { + for (int i = 0; i < 2000000; i++) { + queue.enqueue(rand() % 100); + } +} + +void ProducerThread() { + for (int i = 0; i < 1000000; i++) { + for (int j = 0; j < 50; j++) { + queue.enqueue(rand() % 100); + } + } +} + +void ProducerConsumerThread() { + unsigned long long sum = 0; + int element; + for (int i = 0; i < 1000000; i++) { + for (int j = 0; j < 50; j++) { + if (!queue.empty() && queue.dequeue(element)) { + sum += element; + } + if (j % 2 == 0) { + queue.enqueue(rand() % 100); + } + } + } +} + +void ConsumerThread() { + int element; + unsigned long long sum = 0; + int yield_times = 3; + while (yield_times > 0) { + while (queue.dequeue(element)) { + sum += element; + yield_times = 3; + } + std::this_thread::yield(); + yield_times--; + } +} + +int main() { + srand(time(NULL)); + const int kThreads = 6; + // Initialize the queue with some elements. + InitQueue(); + cout << "Starting " << kThreads << " threads for RWQueue...\n"; + + struct timespec start, finish; + double elapsed = 0.0; + clock_gettime(CLOCK_MONOTONIC, &start); + + std::thread threads[kThreads]; + // Producer thread + threads[0] = std::thread(ProducerThread); + // ProducerConsumer threads + for (int i = 1; i < kThreads; i++) { + threads[i] = std::thread(ProducerConsumerThread); + } + + for (int i = 0; i < kThreads; i++) { + threads[i].join(); + } + + clock_gettime(CLOCK_MONOTONIC, &finish); + elapsed = (finish.tv_sec - start.tv_sec); + elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0; + cout << "All threads finished.\n"; + cout << "Time: " << elapsed << " seconds\n"; + return 0; +} diff --git a/test/stress/misc/seqlock_driver.cpp b/test/stress/misc/seqlock_driver.cpp new file mode 100644 index 00000000..e230de5f --- /dev/null +++ b/test/stress/misc/seqlock_driver.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +typedef cds_others::SeqLock SeqLock; + +class SeqLockTest : public cds_test::stress_fixture { +protected: + static int sum; + static SeqLock *seqlock; + static const int kReaderThreads = 0; + static const int kWriterThreads = 0; + static const int kReaderWriterThreads = 6; + static const int kWriterPercentage = 15; + static const int kPassCount = 15000; + + static void SetUpTestCase() {} + + static void ReaderThread() {} + + static void WriterThread() {} + + static void ReaderWriterThread() { + for (int i = 0; i < kPassCount; i++) { + for (int j = 0; j < kPassCount; j++) { + if (rand(100) < kWriterPercentage) { + sum += seqlock->read(); + } else { + seqlock->write(rand(10)); + } + } + } + } +}; + +int SeqLockTest::sum; +SeqLock *SeqLockTest::seqlock; +const int SeqLockTest::kReaderThreads; +const int SeqLockTest::kWriterThreads; +const int SeqLockTest::kReaderWriterThreads; + +TEST_F(SeqLockTest, BasicReadWriter) { + seqlock = new SeqLock(); + int num_threads = kReaderThreads + kWriterThreads + kReaderWriterThreads; + std::thread *threads = new std::thread[num_threads]; + for (int i = 0; i < kReaderThreads; i++) { + threads[i] = std::thread(ReaderThread); + } + for (int i = kReaderThreads; i < (kReaderThreads + kWriterThreads); i++) { + threads[i] = std::thread(WriterThread); + } + for (int i = (kReaderThreads + kWriterThreads); i < num_threads; i++) { + threads[i] = std::thread(ReaderWriterThread); + } + + for (int i = 0; i < num_threads; i++) { + threads[i].join(); + } +} + +} // namespace diff --git a/test/stress/misc/spinlock_driver.cpp b/test/stress/misc/spinlock_driver.cpp new file mode 100644 index 00000000..a8d4923f --- /dev/null +++ b/test/stress/misc/spinlock_driver.cpp @@ -0,0 +1,66 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +typedef cds::sync::spin SpinLock; +typedef cds::sync::reentrant_spin32 Reentrant32; +typedef cds::sync::reentrant_spin64 Reentrant64; + +#define TASK(lock_type, lock_ptr) \ + static void Thread ## lock_type() { \ + for (int i = 0; i < 100000; i++) { \ + for (int j = 0; j < 30000; j++) { \ + lock_ptr->lock(); \ + x = i + j; \ + lock_ptr->unlock(); \ + } \ + } \ + } + +#define LOCK_TEST(lock_type, lock_ptr) \ + TEST_F(SpinLockTest, lock_type) { \ + lock_ptr = new lock_type(); \ + std::thread threads[kThreads]; \ + for (int i = 0; i < kThreads; i++) { \ + threads[i] = std::thread(Thread ## lock_type); \ + } \ + for (int i = 0; i < kThreads; i++) { \ + threads[i].join(); \ + } \ +} + +class SpinLockTest : public cds_test::stress_fixture { +protected: + static int x; + static SpinLock* spin_mutex; + static Reentrant32* reentrant_mutex32; + static Reentrant64* reentrant_mutex64; + + static const int kThreads = 6; + + static void SetUpTestCase() {} + + TASK(SpinLock, spin_mutex) + TASK(Reentrant32, reentrant_mutex32) + TASK(Reentrant64, reentrant_mutex64) +}; + +int SpinLockTest::x; +const int SpinLockTest::kThreads; +SpinLock* SpinLockTest::spin_mutex; +Reentrant32* SpinLockTest::reentrant_mutex32; +Reentrant64* SpinLockTest::reentrant_mutex64; + +LOCK_TEST(SpinLock, spin_mutex) +LOCK_TEST(Reentrant32, reentrant_mutex32) +LOCK_TEST(Reentrant64, reentrant_mutex64) + +} // namespace