X-Git-Url: http://plrg.eecs.uci.edu/git/?p=libcds.git;a=blobdiff_plain;f=cds%2Fcontainer%2Fchase-lev-deque.h;fp=cds%2Fcontainer%2Fchase-lev-deque.h;h=4f8125842092cd8bc549b0dbda4090713737b4f9;hp=0000000000000000000000000000000000000000;hb=197419a18f00be2e7478e07ab03025856cd5bf34;hpb=df58fae11f80f03ca5b50c800f2978b1ff08efea diff --git a/cds/container/chase-lev-deque.h b/cds/container/chase-lev-deque.h new file mode 100644 index 00000000..4f812584 --- /dev/null +++ b/cds/container/chase-lev-deque.h @@ -0,0 +1,125 @@ +#ifndef _CHASE_LEV_DEQUE_H +#define _CHASE_LEV_DEQUE_H + +#include +#include +#include +#include +#include + +namespace cds_others { + +#define EMPTY 0xffffffff +#define ABORT 0xfffffffe + +using std::memory_order_seq_cst; +using std::memory_order_release; +using std::memory_order_acquire; +using std::memory_order_acq_rel; +using std::memory_order_relaxed; +using std::atomic_int; +using std::atomic_size_t; +using std::atomic_uintptr_t; +using std::size_t; + +class ChaseLevDeque { +private: + atomic_size_t top; + atomic_size_t bottom; + atomic_uintptr_t array; /* Atomic(Array *) */ + +public: + struct Array { + atomic_size_t size; + atomic_int buffer[]; + }; + + ChaseLevDeque() { + Array *a = (Array *)calloc(1, sizeof(Array) + 2 * sizeof(atomic_int)); + array.store((uintptr_t)a, memory_order_relaxed); + top.store(0, memory_order_relaxed); + bottom.store(0, memory_order_relaxed); + a->size.store(2, memory_order_relaxed); + } + + int take() { + size_t b = bottom.load(memory_order_relaxed) - 1; + Array *a = (Array *)array.load(memory_order_relaxed); + bottom.store(b, memory_order_relaxed); + atomic_thread_fence(memory_order_seq_cst); + size_t t = top.load(memory_order_relaxed); + int x; + if (t <= b) { + /* Non-empty queue. */ + x = a->buffer[b % a->size.load(memory_order_relaxed)].load( + memory_order_relaxed); + if (t == b) { + /* Single last element in queue. */ + if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, + memory_order_relaxed)) + /* Failed race. */ + x = EMPTY; + bottom.store(b + 1, memory_order_relaxed); + } + } else { /* Empty queue. */ + x = EMPTY; + bottom.store(b + 1, memory_order_relaxed); + } + return x; + } + + void resize() { + Array *a = (Array *)array.load(memory_order_relaxed); + size_t size = a->size.load(memory_order_relaxed); + size_t new_size = size << 1; + Array *new_a = + (Array *)calloc(1, new_size * sizeof(atomic_int) + sizeof(Array)); + size_t t = top.load(memory_order_relaxed); + size_t b = bottom.load(memory_order_relaxed); + new_a->size.store(new_size, memory_order_relaxed); + size_t i; + for (i = t; i < b; i++) { + new_a->buffer[i % new_size].store( + a->buffer[i % size].load(memory_order_relaxed), memory_order_relaxed); + } + array.store((uintptr_t)new_a, memory_order_release); + // std::cout << "Resize to " << new_size << "\n"; + } + + void push(int x) { + size_t b = bottom.load(memory_order_relaxed); + size_t t = top.load(memory_order_acquire); + Array *a = (Array *)array.load(memory_order_relaxed); + if (b - t > a->size.load(memory_order_relaxed) - 1) /* Full queue. */ { + resize(); + // Bug in paper...should have next line... + a = (Array *)array.load(memory_order_relaxed); + } + a->buffer[b % a->size.load(memory_order_relaxed)].store( + x, memory_order_relaxed); + atomic_thread_fence(memory_order_release); + bottom.store(b + 1, memory_order_relaxed); + } + + int steal() { + size_t t = top.load(memory_order_acquire); + atomic_thread_fence(memory_order_seq_cst); + size_t b = bottom.load(memory_order_acquire); + int x = EMPTY; + if (t < b) { + /* Non-empty queue. */ + Array *a = (Array *)array.load(memory_order_acquire); + x = a->buffer[t % a->size.load(memory_order_relaxed)].load( + memory_order_relaxed); + if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, + memory_order_relaxed)) + /* Failed race. */ + return ABORT; + } + return x; + } +}; + +} // namespace cds_others + +#endif