From 991d0fdc0fb17f85d485c97dc2c5d8d57a98dad9 Mon Sep 17 00:00:00 2001 From: Brian Norris Date: Tue, 13 Nov 2012 17:30:30 -0800 Subject: [PATCH 1/1] spsc-bugfix: duplicate the (buggy) spsc-queue Preparing to fix the deadlock bug. --- spsc-bugfix/.gitignore | 2 + spsc-bugfix/Makefile | 23 ++++++++++ spsc-bugfix/eventcount-relacy.h | 64 +++++++++++++++++++++++++++ spsc-bugfix/eventcount.h | 69 +++++++++++++++++++++++++++++ spsc-bugfix/queue-relacy.h | 74 +++++++++++++++++++++++++++++++ spsc-bugfix/queue.h | 77 +++++++++++++++++++++++++++++++++ spsc-bugfix/spsc-queue.cc | 34 +++++++++++++++ spsc-bugfix/spsc-relacy.cc | 27 ++++++++++++ 8 files changed, 370 insertions(+) create mode 100644 spsc-bugfix/.gitignore create mode 100644 spsc-bugfix/Makefile create mode 100644 spsc-bugfix/eventcount-relacy.h create mode 100644 spsc-bugfix/eventcount.h create mode 100644 spsc-bugfix/queue-relacy.h create mode 100644 spsc-bugfix/queue.h create mode 100644 spsc-bugfix/spsc-queue.cc create mode 100644 spsc-bugfix/spsc-relacy.cc diff --git a/spsc-bugfix/.gitignore b/spsc-bugfix/.gitignore new file mode 100644 index 0000000..2485456 --- /dev/null +++ b/spsc-bugfix/.gitignore @@ -0,0 +1,2 @@ +/spsc-queue +/spsc-relacy diff --git a/spsc-bugfix/Makefile b/spsc-bugfix/Makefile new file mode 100644 index 0000000..6465176 --- /dev/null +++ b/spsc-bugfix/Makefile @@ -0,0 +1,23 @@ +include ../benchmarks.mk + +TESTNAME = spsc-queue +RELACYNAME = spsc-relacy + +all: $(TESTNAME) + +$(TESTNAME): $(TESTNAME).cc queue.h eventcount.h + $(CXX) -o $@ $< $(CPPFLAGS) $(LDFLAGS) + +relacy: $(RELACYNAME) + +$(RELACYNAME): spsc-relacy.cc queue-relacy.h eventcount-relacy.h +ifdef RELACYPATH + $(CXX) -o $(RELACYNAME) spsc-relacy.cc -I$(RELACYPATH) -Wno-deprecated +else + @echo "Please define RELACYPATH" + @echo " e.g., make RELACYPATH=/path-to-relacy" + @exit 1 +endif + +clean: + rm -f $(TESTNAME) $(RELACYNAME) *.o diff --git a/spsc-bugfix/eventcount-relacy.h b/spsc-bugfix/eventcount-relacy.h new file mode 100644 index 0000000..9eadcf3 --- /dev/null +++ b/spsc-bugfix/eventcount-relacy.h @@ -0,0 +1,64 @@ +class eventcount +{ +public: + eventcount() : waiters(0) + { + count($) = 0; + } + + void signal_relaxed() + { + unsigned cmp = count.load(std::memory_order_relaxed); + signal_impl(cmp); + } + + void signal() + { + unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst); + signal_impl(cmp); + } + + unsigned get() + { + unsigned cmp = count.fetch_or(0x80000000, +std::memory_order_seq_cst); + return cmp & 0x7FFFFFFF; + } + + void wait(unsigned cmp) + { + unsigned ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + guard.lock($); + ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + waiters($) += 1; + cv.wait(guard, $); + } + guard.unlock($); + } + } + +private: + std::atomic count; + rl::var waiters; + std::mutex guard; + std::condition_variable cv; + + void signal_impl(unsigned cmp) + { + if (cmp & 0x80000000) + { + guard.lock($); + while (false == count.compare_exchange_weak(cmp, + (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed)); + unsigned w = waiters($); + waiters($) = 0; + guard.unlock($); + if (w) + cv.notify_all($); + } + } +}; diff --git a/spsc-bugfix/eventcount.h b/spsc-bugfix/eventcount.h new file mode 100644 index 0000000..aec3e8c --- /dev/null +++ b/spsc-bugfix/eventcount.h @@ -0,0 +1,69 @@ +#include +#include +#include +#include + +class eventcount +{ +public: + eventcount() : waiters(0) + { + count = 0; + } + + void signal_relaxed() + { + unsigned cmp = count.load(std::memory_order_relaxed); + signal_impl(cmp); + } + + void signal() + { + unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst); + signal_impl(cmp); + } + + unsigned get() + { + unsigned cmp = count.fetch_or(0x80000000, +std::memory_order_seq_cst); + return cmp & 0x7FFFFFFF; + } + + void wait(unsigned cmp) + { + unsigned ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + guard.lock($); + ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + waiters += 1; + cv.wait(guard); + } + guard.unlock($); + } + } + +private: + std::atomic count; + rl::var waiters; + std::mutex guard; + std::condition_variable cv; + + void signal_impl(unsigned cmp) + { + if (cmp & 0x80000000) + { + guard.lock($); + while (false == count.compare_exchange_weak(cmp, + (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed)); + unsigned w = waiters($); + waiters = 0; + guard.unlock($); + if (w) + cv.notify_all($); + } + } +}; diff --git a/spsc-bugfix/queue-relacy.h b/spsc-bugfix/queue-relacy.h new file mode 100644 index 0000000..71aac2a --- /dev/null +++ b/spsc-bugfix/queue-relacy.h @@ -0,0 +1,74 @@ +#include "eventcount-relacy.h" + +template +class spsc_queue +{ +public: + spsc_queue() + { + node* n = new node (); + head($) = n; + tail($) = n; + } + + ~spsc_queue() + { + RL_ASSERT(head($) == tail($)); + delete ((node*)head($)); + } + + void enqueue(T data) + { + node* n = new node (data); + head($)->next.store(n, std::memory_order_release); + head($) = n; + ec.signal_relaxed(); + } + + T dequeue() + { + T data = try_dequeue(); + while (0 == data) + { + int cmp = ec.get(); + data = try_dequeue(); + if (data) + break; + ec.wait(cmp); + data = try_dequeue(); + if (data) + break; + } + return data; + } + +private: + struct node + { + std::atomic next; + rl::var data; + + node(T data = T()) + : data(data) + { + next($) = 0; + } + }; + + rl::var head; + rl::var tail; + + eventcount ec; + + T try_dequeue() + { + node* t = tail($); + node* n = t->next.load(std::memory_order_acquire); + if (0 == n) + return 0; + T data = n->data($); + delete (t); + tail($) = n; + return data; + } +}; diff --git a/spsc-bugfix/queue.h b/spsc-bugfix/queue.h new file mode 100644 index 0000000..c77425f --- /dev/null +++ b/spsc-bugfix/queue.h @@ -0,0 +1,77 @@ +#include +#include + +#include "eventcount.h" + +template +class spsc_queue +{ +public: + spsc_queue() + { + node* n = new node (); + head = n; + tail = n; + } + + ~spsc_queue() + { + RL_ASSERT(head == tail); + delete ((node*)head($)); + } + + void enqueue(T data) + { + node* n = new node (data); + head($)->next.store(n, std::memory_order_release); + head = n; + ec.signal_relaxed(); + } + + T dequeue() + { + T data = try_dequeue(); + while (0 == data) + { + int cmp = ec.get(); + data = try_dequeue(); + if (data) + break; + ec.wait(cmp); + data = try_dequeue(); + if (data) + break; + } + return data; + } + +private: + struct node + { + std::atomic next; + rl::var data; + + node(T data = T()) + : data(data) + { + next = 0; + } + }; + + rl::var head; + rl::var tail; + + eventcount ec; + + T try_dequeue() + { + node* t = tail($); + node* n = t->next.load(std::memory_order_acquire); + if (0 == n) + return 0; + T data = n->data($); + delete (t); + tail = n; + return data; + } +}; diff --git a/spsc-bugfix/spsc-queue.cc b/spsc-bugfix/spsc-queue.cc new file mode 100644 index 0000000..f8528a8 --- /dev/null +++ b/spsc-bugfix/spsc-queue.cc @@ -0,0 +1,34 @@ +#include + +#include "queue.h" + +spsc_queue *q; + + void thread(unsigned thread_index) + { + if (0 == thread_index) + { + q->enqueue(11); + } + else + { + int d = q->dequeue(); + RL_ASSERT(11 == d); + } + } + +int user_main(int argc, char **argv) +{ + thrd_t A, B; + + q = new spsc_queue(); + + thrd_create(&A, (thrd_start_t)&thread, (void *)0); + thrd_create(&B, (thrd_start_t)&thread, (void *)1); + thrd_join(A); + thrd_join(B); + + delete q; + + return 0; +} diff --git a/spsc-bugfix/spsc-relacy.cc b/spsc-bugfix/spsc-relacy.cc new file mode 100644 index 0000000..37ed989 --- /dev/null +++ b/spsc-bugfix/spsc-relacy.cc @@ -0,0 +1,27 @@ +#include + +#include "queue-relacy.h" + +struct spsc_queue_test : rl::test_suite +{ + spsc_queue q; + + void thread(unsigned thread_index) + { + if (0 == thread_index) + { + q.enqueue(11); + } + else + { + int d = q.dequeue(); + RL_ASSERT(11 == d); + } + } +}; + + +int main() +{ + rl::simulate(); +} -- 2.34.1