From 6f74d487d2d4e80c3785bef20be19d6975c45c73 Mon Sep 17 00:00:00 2001 From: Brian Norris Date: Thu, 11 Oct 2012 11:31:03 -0700 Subject: [PATCH] williams-queue: add lock-free-queue Lock-free queue code from "C++ Concurrency in Action: Practical Multithreading", by Anthony Williams Code taken from: http://www.manning.com/williams/CCiA_SourceCode.zip http://www.manning.com/williams/ Note that this commit includes several implementation of pieces of the queue. I will trim them down to a working class in the next step. --- williams-queue/williams-queue.h | 353 ++++++++++++++++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 williams-queue/williams-queue.h diff --git a/williams-queue/williams-queue.h b/williams-queue/williams-queue.h new file mode 100644 index 0000000..45edb47 --- /dev/null +++ b/williams-queue/williams-queue.h @@ -0,0 +1,353 @@ +/* + * Lock-free queue code from + * "C++ Concurrency in Action: Practical Multithreading", by Anthony Williams + * + * Code taken from: + * http://www.manning.com/williams/CCiA_SourceCode.zip + * http://www.manning.com/williams/ + */ + +// ./listing_7.13.cpp + +#include +#include + +template +class lock_free_queue +{ +private: + struct node + { + std::shared_ptr data; + node* next; + node(): + next(nullptr) + {} + }; + std::atomic head; + std::atomic tail; + node* pop_head() + { + node* const old_head=head.load(); + if(old_head==tail.load()) + { + return nullptr; + } + head.store(old_head->next); + return old_head; + } +public: + lock_free_queue(): + head(new node),tail(head.load()) + {} + lock_free_queue(const lock_free_queue& other)=delete; + lock_free_queue& operator=(const lock_free_queue& other)=delete; + ~lock_free_queue() + { + while(node* const old_head=head.load()) + { + head.store(old_head->next); + delete old_head; + } + } + std::shared_ptr pop() + { + node* old_head=pop_head(); + if(!old_head) + { + return std::shared_ptr(); + } + std::shared_ptr const res(old_head->data); + delete old_head; + return res; + } + void push(T new_value) + { + std::shared_ptr new_data(std::make_shared(new_value)); + node* p=new node; + node* const old_tail=tail.load(); + old_tail->data.swap(new_data); + old_tail->next=p; + tail.store(p); + } +}; + + +// ./listing_7.15.cpp + +#include + +template +class lock_free_queue +{ +private: + struct node; + struct counted_node_ptr + { + int external_count; + node* ptr; + }; + std::atomic head; + std::atomic tail; + struct node_counter + { + unsigned internal_count:30; + unsigned external_counters:2; + }; + struct node + { + std::atomic data; + std::atomic count; + counted_node_ptr next; + node() + { + node_counter new_count; + new_count.internal_count=0; + new_count.external_counters=2; + count.store(new_count); + next.ptr=nullptr; + next.external_count=0; + } + }; +public: + void push(T new_value) + { + std::unique_ptr new_data(new T(new_value)); + counted_node_ptr new_next; + new_next.ptr=new node; + new_next.external_count=1; + counted_node_ptr old_tail=tail.load(); + for(;;) + { + increase_external_count(tail,old_tail); + T* old_data=nullptr; + if(old_tail.ptr->data.compare_exchange_strong( + old_data,new_data.get())) + { + old_tail.ptr->next=new_next; + old_tail=tail.exchange(new_next); + free_external_counter(old_tail); + new_data.release(); + break; + } + old_tail.ptr->release_ref(); + } + } +}; + + +// ./listing_7.16.cpp + +template +class lock_free_queue +{ +private: + struct node + { + void release_ref(); + }; +public: + std::unique_ptr pop() + { + counted_node_ptr old_head=head.load(std::memory_order_relaxed); + for(;;) + { + increase_external_count(head,old_head); + node* const ptr=old_head.ptr; + if(ptr==tail.load().ptr) + { + ptr->release_ref(); + return std::unique_ptr(); + } + if(head.compare_exchange_strong(old_head,ptr->next)) + { + T* const res=ptr->data.exchange(nullptr); + free_external_counter(old_head); + return std::unique_ptr(res); + } + ptr->release_ref(); + } + } +}; + + +// ./listing_7.17.cpp + +template +class lock_free_queue +{ +private: + struct node + { + void release_ref() + { + node_counter old_counter= + count.load(std::memory_order_relaxed); + node_counter new_counter; + do + { + new_counter=old_counter; + --new_counter.internal_count; + } + while(!count.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + if(!new_counter.internal_count && + !new_counter.external_counters) + { + delete this; + } + } + }; +}; + + +// ./listing_7.18.cpp + +template +class lock_free_queue +{ +private: + static void increase_external_count( + std::atomic& counter, + counted_node_ptr& old_counter) + { + counted_node_ptr new_counter; + do + { + new_counter=old_counter; + ++new_counter.external_count; + } + while(!counter.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + old_counter.external_count=new_counter.external_count; + } +}; + + +// ./listing_7.19.cpp + +template +class lock_free_queue +{ +private: + static void free_external_counter(counted_node_ptr &old_node_ptr) + { + node* const ptr=old_node_ptr.ptr; + int const count_increase=old_node_ptr.external_count-2; + node_counter old_counter= + ptr->count.load(std::memory_order_relaxed); + node_counter new_counter; + do + { + new_counter=old_counter; + --new_counter.external_counters; + new_counter.internal_count+=count_increase; + } + while(!ptr->count.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + if(!new_counter.internal_count && + !new_counter.external_counters) + { + delete ptr; + } + } +}; + + +// ./listing_7.20.cpp + +template +class lock_free_queue +{ +private: + struct node + { + std::atomic data; + std::atomic count; + std::atomic next; + }; +public: + std::unique_ptr pop() + { + counted_node_ptr old_head=head.load(std::memory_order_relaxed); + for(;;) + { + increase_external_count(head,old_head); + node* const ptr=old_head.ptr; + if(ptr==tail.load().ptr) + { + return std::unique_ptr(); + } + counted_node_ptr next=ptr->next.load(); + if(head.compare_exchange_strong(old_head,next)) + { + T* const res=ptr->data.exchange(nullptr); + free_external_counter(old_head); + return std::unique_ptr(res); + } + ptr->release_ref(); + } + } +}; + + +// ./listing_7.21.cpp + +template +class lock_free_queue +{ +private: + void set_new_tail(counted_node_ptr &old_tail, + counted_node_ptr const &new_tail) + { + node* const current_tail_ptr=old_tail.ptr; + while(!tail.compare_exchange_weak(old_tail,new_tail) && + old_tail.ptr==current_tail_ptr); + if(old_tail.ptr==current_tail_ptr) + free_external_counter(old_tail); + else + current_tail_ptr->release_ref(); + } +public: + void push(T new_value) + { + std::unique_ptr new_data(new T(new_value)); + counted_node_ptr new_next; + new_next.ptr=new node; + new_next.external_count=1; + counted_node_ptr old_tail=tail.load(); + for(;;) + { + increase_external_count(tail,old_tail); + T* old_data=nullptr; + if(old_tail.ptr->data.compare_exchange_strong( + old_data,new_data.get())) + { + counted_node_ptr old_next={0}; + if(!old_tail.ptr->next.compare_exchange_strong( + old_next,new_next)) + { + delete new_next.ptr; + new_next=old_next; + } + set_new_tail(old_tail, new_next); + new_data.release(); + break; + } + else + { + counted_node_ptr old_next={0}; + if(old_tail.ptr->next.compare_exchange_strong( + old_next,new_next)) + { + old_next=new_next; + new_next.ptr=new node; + } + set_new_tail(old_tail, old_next); + } + } + } +}; -- 2.34.1