From 62388a2f7263a9ee49226dbf4328e2677b07f0d7 Mon Sep 17 00:00:00 2001 From: Brian Norris Date: Wed, 10 Oct 2012 13:54:56 -0700 Subject: [PATCH] mpmc-queue: add MPMC queue header From first example at: http://cbloomrants.blogspot.com/2011/07/07-30-11-look-at-some-bounded-queues.html --- mpmc-queue/mpmc-queue.h | 87 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 mpmc-queue/mpmc-queue.h diff --git a/mpmc-queue/mpmc-queue.h b/mpmc-queue/mpmc-queue.h new file mode 100644 index 0000000..2fe5ad1 --- /dev/null +++ b/mpmc-queue/mpmc-queue.h @@ -0,0 +1,87 @@ +template +struct mpmc_boundq_1_alt +{ +private: + + // elements should generally be cache-line-size padded : + nonatomic m_array[t_size]; + + // rdwr counts the reads & writes that have started + atomic m_rdwr; + // "read" and "written" count the number completed + atomic m_read; + atomic m_written; + +public: + + mpmc_boundq_1_alt() : m_rdwr(0), m_read(0), m_written(0) + { + } + + //----------------------------------------------------- + + nonatomic * read_fetch() { + unsigned int rdwr = m_rdwr($).load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == rd ) // empty + return false; + + if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) ) + break; + } + + // (*1) + rl::backoff bo; + while ( (m_written($).load(mo_acquire) & 0xFFFF) != wr ) { + bo.yield($); + } + + nonatomic * p = & ( m_array[ rd % t_size ] ); + + return p; + } + + void read_consume() { + m_read($).fetch_add(1,mo_release); + } + + //----------------------------------------------------- + + nonatomic * write_prepare() { + unsigned int rdwr = m_rdwr($).load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == ((rd + t_size)&0xFFFF) ) // full + return NULL; + + if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) ) + break; + } + + // (*1) + rl::backoff bo; + while ( (m_read($).load(mo_acquire) & 0xFFFF) != rd ) { + bo.yield($); + } + + nonatomic * p = & ( m_array[ wr % t_size ] ); + + return p; + } + + void write_publish() + { + m_written($).fetch_add(1,mo_release); + } + + //----------------------------------------------------- + + +}; -- 2.34.1