From: Peizhao Ou Date: Sat, 11 Jan 2014 01:29:30 +0000 (-0800) Subject: more data structures X-Git-Url: http://plrg.eecs.uci.edu/git/?p=cdsspec-compiler.git;a=commitdiff_plain;h=0a040a5b619952d29338f73db8f685d7367178fc more data structures --- diff --git a/benchmark/chase-lev-deque-bugfix/Makefile b/benchmark/chase-lev-deque-bugfix/Makefile new file mode 100644 index 0000000..91ff999 --- /dev/null +++ b/benchmark/chase-lev-deque-bugfix/Makefile @@ -0,0 +1,17 @@ +include ../benchmarks.mk + +TESTNAME = main + +HEADERS = deque.h +OBJECTS = main.o deque.o + +all: $(TESTNAME) + +$(TESTNAME): $(HEADERS) $(OBJECTS) + $(CC) -o $@ $(OBJECTS) $(CPPFLAGS) $(LDFLAGS) + +%.o: %.c + $(CC) -c -o $@ $< $(CPPFLAGS) + +clean: + rm -f $(TESTNAME) *.o diff --git a/benchmark/chase-lev-deque-bugfix/deque.c b/benchmark/chase-lev-deque-bugfix/deque.c new file mode 100644 index 0000000..6328446 --- /dev/null +++ b/benchmark/chase-lev-deque-bugfix/deque.c @@ -0,0 +1,85 @@ +#include +#include +#include "deque.h" +#include +#include + +Deque * create() { + Deque * q = (Deque *) calloc(1, sizeof(Deque)); + Array * a = (Array *) calloc(1, sizeof(Array)+2*sizeof(atomic_int)); + atomic_store_explicit(&q->array, a, memory_order_relaxed); + atomic_store_explicit(&q->top, 0, memory_order_relaxed); + atomic_store_explicit(&q->bottom, 0, memory_order_relaxed); + atomic_store_explicit(&a->size, 2, memory_order_relaxed); + return q; +} + +int take(Deque *q) { + size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; + Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed); + atomic_store_explicit(&q->bottom, b, memory_order_relaxed); + atomic_thread_fence(memory_order_seq_cst); + size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); + int x; + if (t <= b) { + /* Non-empty queue. */ + x = atomic_load_explicit(&a->buffer[b % atomic_load_explicit(&a->size,memory_order_relaxed)], memory_order_relaxed); + if (t == b) { + /* Single last element in queue. */ + if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + /* Failed race. */ + x = EMPTY; + atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); + } + } else { /* Empty queue. */ + x = EMPTY; + atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); + } + return x; +} + +void resize(Deque *q) { + Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed); + size_t size=atomic_load_explicit(&a->size, memory_order_relaxed); + size_t new_size=size << 1; + Array *new_a = (Array *) calloc(1, new_size * sizeof(atomic_int) + sizeof(Array)); + size_t top=atomic_load_explicit(&q->top, memory_order_relaxed); + size_t bottom=atomic_load_explicit(&q->bottom, memory_order_relaxed); + atomic_store_explicit(&new_a->size, new_size, memory_order_relaxed); + size_t i; + for(i=top; i < bottom; i++) { + atomic_store_explicit(&new_a->buffer[i % new_size], atomic_load_explicit(&a->buffer[i % size], memory_order_relaxed), memory_order_relaxed); + } + atomic_store_explicit(&q->array, new_a, memory_order_release); + printf("resize\n"); +} + +void push(Deque *q, int x) { + size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); + size_t t = atomic_load_explicit(&q->top, memory_order_acquire); + Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed); + if (b - t > atomic_load_explicit(&a->size, memory_order_relaxed) - 1) /* Full queue. */ { + resize(q); + //Bug in paper...should have next line... + a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed); + } + atomic_store_explicit(&a->buffer[b % atomic_load_explicit(&a->size, memory_order_relaxed)], x, memory_order_relaxed); + atomic_thread_fence(memory_order_release); + atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); +} + +int steal(Deque *q) { + size_t t = atomic_load_explicit(&q->top, memory_order_acquire); + atomic_thread_fence(memory_order_seq_cst); + size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); + int x = EMPTY; + if (t < b) { + /* Non-empty queue. */ + Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_acquire); + x = atomic_load_explicit(&a->buffer[t % atomic_load_explicit(&a->size, memory_order_relaxed)], memory_order_relaxed); + if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + /* Failed race. */ + return ABORT; + } + return x; +} diff --git a/benchmark/chase-lev-deque-bugfix/deque.h b/benchmark/chase-lev-deque-bugfix/deque.h new file mode 100644 index 0000000..bc670e7 --- /dev/null +++ b/benchmark/chase-lev-deque-bugfix/deque.h @@ -0,0 +1,22 @@ +#ifndef DEQUE_H +#define DEQUE_H + +typedef struct { + atomic_size_t size; + atomic_int buffer[]; +} Array; + +typedef struct { + atomic_size_t top, bottom; + atomic_uintptr_t array; /* Atomic(Array *) */ +} Deque; + +Deque * create(); +int take(Deque *q); +void resize(Deque *q); +void push(Deque *q, int x); + +#define EMPTY 0xffffffff +#define ABORT 0xfffffffe + +#endif diff --git a/benchmark/chase-lev-deque-bugfix/main.c b/benchmark/chase-lev-deque-bugfix/main.c new file mode 100644 index 0000000..f2e8dca --- /dev/null +++ b/benchmark/chase-lev-deque-bugfix/main.c @@ -0,0 +1,46 @@ +#include +#include +#include +#include +#include + +#include "model-assert.h" + +#include "deque.h" + +Deque *q; +int a; +int b; +int c; + +static void task(void * param) { + a=steal(q); +} + +int user_main(int argc, char **argv) +{ + thrd_t t; + q=create(); + thrd_create(&t, task, 0); + push(q, 1); + push(q, 2); + push(q, 4); + b=take(q); + c=take(q); + thrd_join(t); + + bool correct=true; + if (a!=1 && a!=2 && a!=4 && a!= EMPTY) + correct=false; + if (b!=1 && b!=2 && b!=4 && b!= EMPTY) + correct=false; + if (c!=1 && c!=2 && c!=4 && a!= EMPTY) + correct=false; + if (a!=EMPTY && b!=EMPTY && c!=EMPTY && (a+b+c)!=7) + correct=false; + if (!correct) + printf("a=%d b=%d c=%d\n",a,b,c); + MODEL_ASSERT(correct); + + return 0; +} diff --git a/benchmark/mcs-lock/.gitignore b/benchmark/mcs-lock/.gitignore new file mode 100644 index 0000000..aef308c --- /dev/null +++ b/benchmark/mcs-lock/.gitignore @@ -0,0 +1 @@ +/mcs-lock diff --git a/benchmark/mcs-lock/Makefile b/benchmark/mcs-lock/Makefile new file mode 100644 index 0000000..5a311b3 --- /dev/null +++ b/benchmark/mcs-lock/Makefile @@ -0,0 +1,11 @@ +include ../benchmarks.mk + +TESTNAME = mcs-lock + +all: $(TESTNAME) + +$(TESTNAME): $(TESTNAME).cc $(TESTNAME).h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + +clean: + rm -f $(TESTNAME) *.o diff --git a/benchmark/mcs-lock/mcs-lock.cc b/benchmark/mcs-lock/mcs-lock.cc new file mode 100644 index 0000000..ec0cc5d --- /dev/null +++ b/benchmark/mcs-lock/mcs-lock.cc @@ -0,0 +1,43 @@ +#include +#include + +#include "mcs-lock.h" + +/* For data race instrumentation */ +#include "librace.h" + +struct mcs_mutex *mutex; +static uint32_t shared; + +void threadA(void *arg) +{ + mcs_mutex::guard g(mutex); + printf("store: %d\n", 17); + store_32(&shared, 17); + mutex->unlock(&g); + mutex->lock(&g); + printf("load: %u\n", load_32(&shared)); +} + +void threadB(void *arg) +{ + mcs_mutex::guard g(mutex); + printf("load: %u\n", load_32(&shared)); + mutex->unlock(&g); + mutex->lock(&g); + printf("store: %d\n", 17); + store_32(&shared, 17); +} + +int user_main(int argc, char **argv) +{ + thrd_t A, B; + + mutex = new mcs_mutex(); + + thrd_create(&A, &threadA, NULL); + thrd_create(&B, &threadB, NULL); + thrd_join(A); + thrd_join(B); + return 0; +} diff --git a/benchmark/mcs-lock/mcs-lock.h b/benchmark/mcs-lock/mcs-lock.h new file mode 100644 index 0000000..6dc66ed --- /dev/null +++ b/benchmark/mcs-lock/mcs-lock.h @@ -0,0 +1,175 @@ +// mcs on stack + +#include +#include + + +/** + Properties to check: + 1. At any point, only one thread can acquire the mutex; when any thread + nlock the mutex, he must have it in his hand. + 2. The sequence of the lock is guaranteed, which means there is a queue for + all the lock operations. + #### + 3. Should establish the happens-before relationship between unlock and lock +*/ + +/** + @Begin + @Global_define: + # The invariant is that the thread that has a guard at the head of the + # queue should be the one who currently has the lock. + + int __lock_acquired = 0; + queue __lock_waiting_queue; + + @Happens-before: + # Without any specifying, this means the beginning of a successful Unlock() + # happens before the end of the next successful Lock(). + Unlock -> Lock + @End +*/ + +struct mcs_node { + std::atomic next; + std::atomic gate; + + mcs_node() { + next.store(0); + gate.store(0); + } +}; + +struct mcs_mutex { +public: + // tail is null when lock is not held + std::atomic m_tail; + + mcs_mutex() { + m_tail.store( NULL ); + } + ~mcs_mutex() { + ASSERT( m_tail.load() == NULL ); + } + + // Each thread will have their own guard. + class guard { + public: + mcs_mutex * m_t; + mcs_node m_node; // node held on the stack + + guard(mcs_mutex * t) : m_t(t) { t->lock(this); } + ~guard() { m_t->unlock(this); } + }; + + + /** + @Begin + # This function will soon enqueue the current guard to the queue to make + # sure it will get fair lock successfully. + @Interface: Lock + @Commit_point_set: Lock_Enqueue_Point + @Action: + @Code: + __lock_waiting_queue.enqueue(I); + @Post_action: + __lock_acquired++; + @Post_check: + # Make sure when it successfully locks, the lock is not acquired yet + # and the locking is in a FIFO order + __lock_acquired == 1 && I == __lock_waiting_queue.peak() + @End + */ + void lock(guard * I) { + mcs_node * me = &(I->m_node); + + // set up my node : + // not published yet so relaxed : + me->next.store(NULL, std::mo_relaxed ); + me->gate.store(1, std::mo_relaxed ); + + // publish my node as the new tail : + mcs_node * pred = m_tail.exchange(me, std::mo_acq_rel); + /** + # Once the exchange completes, the thread already claimed the next + # available slot for the lock + @Begin + @Commit_point_check_define: true + @Label: Lock_Enqueue_Point + @End + */ + if ( pred != NULL ) { + // (*1) race here + // unlock of pred can see me in the tail before I fill next + + // publish me to previous lock-holder : + pred->next.store(me, std::mo_release ); + + // (*2) pred not touched any more + + // now this is the spin - + // wait on predecessor setting my flag - + rl::linear_backoff bo; + int my_gate = 1; + while ( (my_gate = me->gate.load(std::mo_acquire)) ) { + thrd_yield(); + } + } + } + + /** + @Begin + @Interface: Unlock + @Commit_point_set: + Unlock = Unlock_Point_Success_1 | Unlock_Point_Success_2 + @Check: + lock_acquired == 1 && I == lock_waiting_queue.peak() + @Action: + @Code: + lock_acquired--; + lock_waiting_queue.dequeue(); + @End + */ + void unlock(guard * I) { + mcs_node * me = &(I->m_node); + + mcs_node * next = me->next.load(std::mo_acquire); + if ( next == NULL ) + { + mcs_node * tail_was_me = me; + bool success; + if ( (success = m_tail.compare_exchange_strong( + tail_was_me,NULL,std::mo_acq_rel)) ) { + /** + @Begin + @Commit_point_check_define: __ATOMIC_RET__ == true + @Label: Unlock_Point_Success_1 + @End + */ + // got null in tail, mutex is unlocked + return; + } + + // (*1) catch the race : + rl::linear_backoff bo; + for(;;) { + next = me->next.load(std::mo_acquire); + if ( next != NULL ) + break; + thrd_yield(); + } + } + + // (*2) - store to next must be done, + // so no locker can be viewing my node any more + + // let next guy in : + next->gate.store( 0, std::mo_release ); + /** + @Begin + @Commit_point_check_define: true + @Label: Unlock_Point_Success_2 + @End + */ + } +}; diff --git a/benchmark/mpmc-queue/.gitignore b/benchmark/mpmc-queue/.gitignore new file mode 100644 index 0000000..4b20d5b --- /dev/null +++ b/benchmark/mpmc-queue/.gitignore @@ -0,0 +1,8 @@ +/mpmc-queue +/mpmc-1r2w +/mpmc-2r1w +/mpmc-queue-noinit +/mpmc-1r2w-noinit +/mpmc-2r1w-noinit +/mpmc-queue-rdwr +/mpmc-rdwr-noinit diff --git a/benchmark/mpmc-queue/Makefile b/benchmark/mpmc-queue/Makefile new file mode 100644 index 0000000..8d9ad1e --- /dev/null +++ b/benchmark/mpmc-queue/Makefile @@ -0,0 +1,22 @@ +include ../benchmarks.mk + +TESTNAME = mpmc-queue +TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr +TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit + +all: $(TESTS) + +mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 +mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 +mpmc-1r2w: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 +mpmc-2r1w: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 +mpmc-queue-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT +mpmc-1r2w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT +mpmc-2r1w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -DCONFIG_MPMC_NO_INITIAL_ELEMENT +mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT + +$(TESTS): $(TESTNAME).cc $(TESTNAME).h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + +clean: + rm -f $(TESTS) *.o diff --git a/benchmark/mpmc-queue/mpmc-queue.cc b/benchmark/mpmc-queue/mpmc-queue.cc new file mode 100644 index 0000000..b62d8d3 --- /dev/null +++ b/benchmark/mpmc-queue/mpmc-queue.cc @@ -0,0 +1,140 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + while (bin = queue->read_fetch()) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +void threadC(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); + + while (bin = queue->read_fetch()) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +#define MAXREADERS 3 +#define MAXWRITERS 3 +#define MAXRDWR 3 + +#ifdef CONFIG_MPMC_READERS +#define DEFAULT_READERS (CONFIG_MPMC_READERS) +#else +#define DEFAULT_READERS 2 +#endif + +#ifdef CONFIG_MPMC_WRITERS +#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS) +#else +#define DEFAULT_WRITERS 2 +#endif + +#ifdef CONFIG_MPMC_RDWR +#define DEFAULT_RDWR (CONFIG_MPMC_RDWR) +#else +#define DEFAULT_RDWR 0 +#endif + +int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR; + +void print_usage() +{ + printf("Error: use the following options\n" + " -r Choose number of reader threads\n" + " -w Choose number of writer threads\n"); + exit(EXIT_FAILURE); +} + +void process_params(int argc, char **argv) +{ + const char *shortopts = "hr:w:"; + int opt; + bool error = false; + + while (!error && (opt = getopt(argc, argv, shortopts)) != -1) { + switch (opt) { + case 'h': + print_usage(); + break; + case 'r': + readers = atoi(optarg); + break; + case 'w': + writers = atoi(optarg); + break; + default: /* '?' */ + error = true; + break; + } + } + + if (writers < 1 || writers > MAXWRITERS) + error = true; + if (readers < 1 || readers > MAXREADERS) + error = true; + + if (error) + print_usage(); +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR]; + + /* Note: optarg() / optind is broken in model-checker - workaround is + * to just copy&paste this test a few times */ + //process_params(argc, argv); + printf("%d reader(s), %d writer(s)\n", readers, writers); + +#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT + printf("Adding initial element\n"); + int32_t *bin = queue.write_prepare(); + store_32(bin, 17); + queue.write_publish(); +#endif + + printf("Start threads\n"); + + for (int i = 0; i < writers; i++) + thrd_create(&A[i], (thrd_start_t)&threadA, &queue); + for (int i = 0; i < readers; i++) + thrd_create(&B[i], (thrd_start_t)&threadB, &queue); + + for (int i = 0; i < rdwr; i++) + thrd_create(&C[i], (thrd_start_t)&threadC, &queue); + + for (int i = 0; i < writers; i++) + thrd_join(A[i]); + for (int i = 0; i < readers; i++) + thrd_join(B[i]); + for (int i = 0; i < rdwr; i++) + thrd_join(C[i]); + + printf("Threads complete\n"); + + return 0; +} diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h new file mode 100644 index 0000000..1e59f43 --- /dev/null +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -0,0 +1,123 @@ +#include +#include + +template +struct mpmc_boundq_1_alt +{ +private: + + // elements should generally be cache-line-size padded : + t_element m_array[t_size]; + + // rdwr counts the reads & writes that have started + atomic m_rdwr; + // "read" and "written" count the number completed + atomic m_read; + atomic m_written; + +public: + + mpmc_boundq_1_alt() + { + m_rdwr = 0; + m_read = 0; + m_written = 0; + } + + + /** + @Global_define: + Order_queue spec_queue; + */ + + //----------------------------------------------------- + + t_element * read_fetch() { + unsigned int rdwr = m_rdwr.load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == rd ) { // empty + return false; + } + + if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { + thrd_yield(); + } + + t_element * p = & ( m_array[ rd % t_size ] ); + + /** + @Commit_point_Check: true + @Label: ANY + @Check: + spec_queue.peak() == p + */ + return p; + } + + void read_consume() { + m_read.fetch_add(1,mo_release); + /** + @Commit_point_define: true + @Label: Read_Consume_Success + @Check: + spec_queue.size() > 0 + @Action: + spec_queue.remove(); + */ + } + + //----------------------------------------------------- + + t_element * write_prepare() { + unsigned int rdwr = m_rdwr.load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == ((rd + t_size)&0xFFFF) ) // full + return NULL; + + if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) { + thrd_yield(); + } + + + t_element * p = & ( m_array[ wr % t_size ] ); + + /** + @Commit_point_check: ANY + @Action: spec_queue.add(p); + */ + return p; + } + + void write_publish() + { + m_written.fetch_add(1,mo_release); + } + + //----------------------------------------------------- + + +}; diff --git a/benchmark/read-copy-update/Makefile b/benchmark/read-copy-update/Makefile new file mode 100644 index 0000000..08941cf --- /dev/null +++ b/benchmark/read-copy-update/Makefile @@ -0,0 +1,11 @@ +include ../benchmarks.mk + +TESTNAME = rcu + +all: $(TESTNAME) + +$(TESTNAME): $(TESTNAME).cc $(TESTNAME).h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + +clean: + rm -f $(TESTNAME) *.o diff --git a/benchmark/read-copy-update/rcu.cc b/benchmark/read-copy-update/rcu.cc new file mode 100644 index 0000000..ba8b9bd --- /dev/null +++ b/benchmark/read-copy-update/rcu.cc @@ -0,0 +1,85 @@ +#include + +using namespace std; + +/** + This is an example about how to specify the correctness of the + read-copy-update synchronization mechanism. +*/ + +// Properties to check: + + +typedef void* (*read_func_ptr_t)(void*); + +template +class rcu { + /** + @Begin + @Global_define: + Type *__cur_data; + + static bool equals(void *ptr1, void *ptr2) { + // ... + // Return if the two data instances pointed to equals to each + // other + } + + @Happens-before: + Write -> Read + Write -> Write + @End + */ +private: + atomic _data; + + public: + /** + @Begin + @Interface: Read + @Commit_point_set: Read_Success_Point + @Action: + @Code: + void *_Old_Data = __cur_data; + @Post_check: + equals(__RET__, _Old_Data->read()) + @End + */ + void* read() { + void *res = NULL; + Type *cur_data_ptr = _data.load(memory_order_acquire); + /** + @Begin + @Commit_point_check_define: true + @Label: Read_Success_Point + @End + */ + res = cur_data_ptr->read(); + return res; + } + + /** + @Begin + @Interface: Write + @Commit_point_set: Write_Success_Point + @Action: + @Code: + __cur_data = new_data; + @End + */ + void write(Type *new_data) { + while (true) { + Type *prev = _data.load(memory_order_acquire); + if (_data.compare_exchange_weak(prev, new_data, + memory_order_release, memory_order_release)) { + /** + @Begin + @Commit_point_check_define: __ATOMIC_RET__ == true + @Label: Write_Success_Point + @End + */ + break; + } + } + } +}; diff --git a/benchmark/seqlock/Makefile b/benchmark/seqlock/Makefile new file mode 100644 index 0000000..071a552 --- /dev/null +++ b/benchmark/seqlock/Makefile @@ -0,0 +1,11 @@ +include ../benchmarks.mk + +TESTNAME = seqlock + +all: $(TESTNAME) + +$(TESTNAME): $(TESTNAME).cc $(TESTNAME).h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + +clean: + rm -f $(TESTNAME) *.o diff --git a/benchmark/seqlock/seqlock.cc b/benchmark/seqlock/seqlock.cc new file mode 100644 index 0000000..f25434d --- /dev/null +++ b/benchmark/seqlock/seqlock.cc @@ -0,0 +1,53 @@ +#include +#include + +#include "seqlock.h" + +class IntWrapper { + private: + int _val; + public: + + IntWrapper(int val) : _val(val) {} + + IntWrapper() : _val(0) {} + + IntWrapper(IntWrapper& copy) : _val(copy._val) {} + + int get() { + return _val; + } +}; + +static void* read_int(void *int_wrapper) { + IntWrapper *ptr = (IntWrapper*) int_wrapper; + return (void*) new int(ptr->get()); +} + + +static IntWrapper *shared_int = new IntWrapper(10); +static seqlock my_lock(shared_int); + +static void read_thrd(void *obj) { + void *res = my_lock.read(read_int); + cout << "Thread read: " << *((int*) res) << endl; +} + +static void write_thrd(void *obj) { + IntWrapper *new_int = new IntWrapper(1024); + my_lock.write(new_int); + cout << "Thread write: " << new_int->get() << endl; +} + +int main(int argc, char *argv[]) { + /* + thrd_t t1, t2, t3; + thrd_create(&t1, (thrd_start_t) &read_thrd, NULL); + thrd_create(&t2, (thrd_start_t) &write_thrd, NULL); + thrd_create(&t3, (thrd_start_t) &read_thrd, NULL); + */ + read_thrd(NULL); + write_thrd(NULL); + read_thrd(NULL); + +} diff --git a/benchmark/seqlock/seqlock.h b/benchmark/seqlock/seqlock.h new file mode 100644 index 0000000..92a3b63 --- /dev/null +++ b/benchmark/seqlock/seqlock.h @@ -0,0 +1,162 @@ +#ifndef _SEQLOCK_H +#define _SEQLOCK_H + +#include + +using namespace std; + +typedef void* (*read_func_ptr_t)(void*); + +/** + Properties to check: + Every read will read the up-to-date value, and only one thread can be + writing. +*/ + + +/** + @Begin + @Global_define: + Type *__data_ptr = NULL; + bool __is_writing = false; + + static bool _is_equals(void* ptr1, void* ptr2) { + //... Should be defined to check if the internal data equals + } + + @Happens-before: + Init -> Read + Init -> Write + Write -> Write + Write -> Read + @End +*/ + +/** + Fixed the write to be lock-free. Use a CAS in the write instead of using the + mutex. There are a couple options for the implementation according to Hans + Boehm's paper <>. + + Interesting thing in the read() function is the memory ordering we should + impose there. In Hans Boehm's paper, he presents 3 ways to do it. We will + try to use the fences one here as a special case to check programs written + with C++ fences. +*/ + +template +class seqlock { + private: + // To make the write lock-free, we don't need the mutex here. + // Mutex to write exclusively + + // Sequence for reader consistency check + atomic_int _seq; + // The shared data structure to be protected; + // It needs to be atomic to avoid data races + atomic _data; + + // This function is to check if the objects pointed by the two pointer are + // the same; it is only used for checking the correctness + bool is_equal(void *ptr1, void *ptr2) { + // return ptr1 == NULL ? false : ptr1->equals(ptr2); + } + + public: + /** + @Begin + @Interface: Init + @Commit_point_set: Init_Point + @Action: + @Code: + __data_ptr = data; + @End + */ + seqlock(Type *data) { + _data.store(data, memory_order_relaxed); + _seq.store(0, memory_order_release); + /** + # Initialization code + @Begin + @Commit_point_check_define: true + @Label: Init_Point + @End + */ + } + + ~seqlock() {} + + /** + @Begin + @Interface: Read + @Commit_point_set: Read_Success_Point + @Action: + @Code: + void *_Old_data_ptr = __data_ptr; + @Post_check: + _is_equal(read_func(_Old_data_ptr), __RET__) + @End + */ + void* read(read_func_ptr_t read_func) { + while (true) { + int old_seq = _seq.load(memory_order_acquire); + if (old_seq % 2 == 1) continue; + // A customized read of something return a pointer to it + // Potential data race, should be atomic + void *res = read_func(_data.load(memory_order_relaxed)); + // This is subtle to use an acquire fence here + // What we actually need is a #LoadLoad fence + atomic_thread_fence(memory_order_acquire); + int new_seq; + if ((new_seq = _seq.load(memory_order_relaxed)) == old_seq) { + /** + @Begin + @Commit_point_check_define: __ATOMIC_RET__ == old_seq + @Label: Read_Success_Point + @End + */ + return res; + } + // _data has been updated, read should retry + } + } + + /** + @Begin + @Interface: Write + @Commit_point_set: Write_Success_Point + @Action: + @Code: + __data_ptr = new_data; + @End + */ + + void write(Type *new_data) { + while (true) { + // This might be a relaxed too + // Should think about how to read the lo + int old_seq = _seq.load(memory_order_acquire); + if (old_seq % 2 == 1) { + // Backoff, another thread is writing + //thrd_yield(); + continue; // Retry + } + // Should be relaxed!!! + if (_seq.compare_exchange_strong(old_seq, old_seq + 1, + memory_order_relaxed, memory_order_relaxed)) + break; + } + // Should be a store release, to synchronize with the fence!!! + _data.store(new_data, memory_order_release); + _seq.fetch_add(1, memory_order_release); + /** + @Begin + @Commit_point_define: true + @Label: Write_Success_Point + @End + */ + } +}; + + +#endif diff --git a/benchmark/spsc-bugfix/.gitignore b/benchmark/spsc-bugfix/.gitignore new file mode 100644 index 0000000..2485456 --- /dev/null +++ b/benchmark/spsc-bugfix/.gitignore @@ -0,0 +1,2 @@ +/spsc-queue +/spsc-relacy diff --git a/benchmark/spsc-bugfix/Makefile b/benchmark/spsc-bugfix/Makefile new file mode 100644 index 0000000..33b9d01 --- /dev/null +++ b/benchmark/spsc-bugfix/Makefile @@ -0,0 +1,23 @@ +include ../benchmarks.mk + +TESTNAME = spsc-queue +RELACYNAME = spsc-relacy + +all: $(TESTNAME) + +$(TESTNAME): $(TESTNAME).cc queue.h eventcount.h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + +relacy: $(RELACYNAME) + +$(RELACYNAME): spsc-relacy.cc queue-relacy.h eventcount-relacy.h +ifdef RELACYPATH + $(CXX) -o $(RELACYNAME) spsc-relacy.cc -I$(RELACYPATH) -Wno-deprecated +else + @echo "Please define RELACYPATH" + @echo " e.g., make RELACYPATH=/path-to-relacy" + @exit 1 +endif + +clean: + rm -f $(TESTNAME) $(RELACYNAME) *.o diff --git a/benchmark/spsc-bugfix/eventcount-relacy.h b/benchmark/spsc-bugfix/eventcount-relacy.h new file mode 100644 index 0000000..9eadcf3 --- /dev/null +++ b/benchmark/spsc-bugfix/eventcount-relacy.h @@ -0,0 +1,64 @@ +class eventcount +{ +public: + eventcount() : waiters(0) + { + count($) = 0; + } + + void signal_relaxed() + { + unsigned cmp = count.load(std::memory_order_relaxed); + signal_impl(cmp); + } + + void signal() + { + unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst); + signal_impl(cmp); + } + + unsigned get() + { + unsigned cmp = count.fetch_or(0x80000000, +std::memory_order_seq_cst); + return cmp & 0x7FFFFFFF; + } + + void wait(unsigned cmp) + { + unsigned ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + guard.lock($); + ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + waiters($) += 1; + cv.wait(guard, $); + } + guard.unlock($); + } + } + +private: + std::atomic count; + rl::var waiters; + std::mutex guard; + std::condition_variable cv; + + void signal_impl(unsigned cmp) + { + if (cmp & 0x80000000) + { + guard.lock($); + while (false == count.compare_exchange_weak(cmp, + (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed)); + unsigned w = waiters($); + waiters($) = 0; + guard.unlock($); + if (w) + cv.notify_all($); + } + } +}; diff --git a/benchmark/spsc-bugfix/eventcount.h b/benchmark/spsc-bugfix/eventcount.h new file mode 100644 index 0000000..aec3e8c --- /dev/null +++ b/benchmark/spsc-bugfix/eventcount.h @@ -0,0 +1,69 @@ +#include +#include +#include +#include + +class eventcount +{ +public: + eventcount() : waiters(0) + { + count = 0; + } + + void signal_relaxed() + { + unsigned cmp = count.load(std::memory_order_relaxed); + signal_impl(cmp); + } + + void signal() + { + unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst); + signal_impl(cmp); + } + + unsigned get() + { + unsigned cmp = count.fetch_or(0x80000000, +std::memory_order_seq_cst); + return cmp & 0x7FFFFFFF; + } + + void wait(unsigned cmp) + { + unsigned ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + guard.lock($); + ec = count.load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + waiters += 1; + cv.wait(guard); + } + guard.unlock($); + } + } + +private: + std::atomic count; + rl::var waiters; + std::mutex guard; + std::condition_variable cv; + + void signal_impl(unsigned cmp) + { + if (cmp & 0x80000000) + { + guard.lock($); + while (false == count.compare_exchange_weak(cmp, + (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed)); + unsigned w = waiters($); + waiters = 0; + guard.unlock($); + if (w) + cv.notify_all($); + } + } +}; diff --git a/benchmark/spsc-bugfix/queue-relacy.h b/benchmark/spsc-bugfix/queue-relacy.h new file mode 100644 index 0000000..7833576 --- /dev/null +++ b/benchmark/spsc-bugfix/queue-relacy.h @@ -0,0 +1,74 @@ +#include "eventcount-relacy.h" + +template +class spsc_queue +{ +public: + spsc_queue() + { + node* n = new node (); + head($) = n; + tail($) = n; + } + + ~spsc_queue() + { + RL_ASSERT(head($) == tail($)); + delete ((node*)head($)); + } + + void enqueue(T data) + { + node* n = new node (data); + head($)->next.store(n, std::memory_order_release); + head($) = n; + ec.signal(); + } + + T dequeue() + { + T data = try_dequeue(); + while (0 == data) + { + int cmp = ec.get(); + data = try_dequeue(); + if (data) + break; + ec.wait(cmp); + data = try_dequeue(); + if (data) + break; + } + return data; + } + +private: + struct node + { + std::atomic next; + rl::var data; + + node(T data = T()) + : data(data) + { + next($) = 0; + } + }; + + rl::var head; + rl::var tail; + + eventcount ec; + + T try_dequeue() + { + node* t = tail($); + node* n = t->next.load(std::memory_order_acquire); + if (0 == n) + return 0; + T data = n->data($); + delete (t); + tail($) = n; + return data; + } +}; diff --git a/benchmark/spsc-bugfix/queue.h b/benchmark/spsc-bugfix/queue.h new file mode 100644 index 0000000..802d455 --- /dev/null +++ b/benchmark/spsc-bugfix/queue.h @@ -0,0 +1,129 @@ +#include +#include + +#include "eventcount.h" + +template +class spsc_queue +{ + /** + @Begin + @Global_define: + typedef struct tag_elem { + Tag id; + T data; + } tag_elem_t; + + Tag tag; + spec_queue __queue; + + static bool _is_elem_equals(void *ptr1, void *ptr2) { + // ... + // return if the elements pointed to are equal + } + + @Happens-before: + Enqueue -> Dequeue + @End + **/ +public: + spsc_queue() + { + node* n = new node (); + head = n; + tail = n; + } + + ~spsc_queue() + { + RL_ASSERT(head == tail); + delete ((node*)head($)); + } + + /** + @Begin + @Commit_point_set: + Enqueue = Enqueue_Success_Point + @ID: tag.current() + @Action: + __queue.enqueue(tag_elem_t(tag.current(), node(data)); + tag.next(); + @End + */ + void enqueue(T data) + { + node* n = new node (data); + head($)->next.store(n, std::memory_order_release); + /** + @Begin + @Commit_point_define: true + @Label: Enqueue_Success_Point + @End + */ + head = n; + // #Mark delete this + ec.signal(); + } + + /** + @Begin + @Commit_point_set: + Dequeue = Try_Dequeue_Success_Point + @ID: __queue.peak().tag + @Check: __queue.size() > 0 && _is_elem_equals(&RET, &__queue.peek().data) + @Action: __queue.dequeue(); + @End + */ + T dequeue() + { + T data = try_dequeue(); + while (0 == data) + { + int cmp = ec.get(); + data = try_dequeue(); + if (data) + break; + ec.wait(cmp); + data = try_dequeue(); + if (data) + break; + } + return data; + } + +private: + struct node + { + std::atomic next; + rl::var data; + + node(T data = T()) + : data(data) + { + next = 0; + } + }; + + rl::var head; + rl::var tail; + + eventcount ec; + + T try_dequeue() + { + node* t = tail($); + node* n = t->next.load(std::memory_order_acquire); + /** + @Begin + @Commit_point_define: ATOMIC_RET != NULL + @Label: Try_Dequeue_Success_Point + @End + */ + if (0 == n) + return 0; + T data = n->data($); + delete (t); + tail = n; + return data; + } +}; diff --git a/benchmark/spsc-bugfix/spsc-queue.cc b/benchmark/spsc-bugfix/spsc-queue.cc new file mode 100644 index 0000000..f8528a8 --- /dev/null +++ b/benchmark/spsc-bugfix/spsc-queue.cc @@ -0,0 +1,34 @@ +#include + +#include "queue.h" + +spsc_queue *q; + + void thread(unsigned thread_index) + { + if (0 == thread_index) + { + q->enqueue(11); + } + else + { + int d = q->dequeue(); + RL_ASSERT(11 == d); + } + } + +int user_main(int argc, char **argv) +{ + thrd_t A, B; + + q = new spsc_queue(); + + thrd_create(&A, (thrd_start_t)&thread, (void *)0); + thrd_create(&B, (thrd_start_t)&thread, (void *)1); + thrd_join(A); + thrd_join(B); + + delete q; + + return 0; +} diff --git a/benchmark/spsc-bugfix/spsc-relacy.cc b/benchmark/spsc-bugfix/spsc-relacy.cc new file mode 100644 index 0000000..8d4ad3b --- /dev/null +++ b/benchmark/spsc-bugfix/spsc-relacy.cc @@ -0,0 +1,29 @@ +#include + +#include "queue-relacy.h" + +struct spsc_queue_test : rl::test_suite +{ + spsc_queue q; + + void thread(unsigned thread_index) + { + if (0 == thread_index) + { + q.enqueue(11); + } + else + { + int d = q.dequeue(); + RL_ASSERT(11 == d); + } + } +}; + + +int main() +{ + rl::test_params params; + params.search_type = rl::fair_full_search_scheduler_type; + rl::simulate(params); +} diff --git a/grammer/spec_compiler.jj b/grammer/spec_compiler.jj index f914094..150ad94 100644 --- a/grammer/spec_compiler.jj +++ b/grammer/spec_compiler.jj @@ -206,6 +206,55 @@ import edu.uci.eecs.specCompiler.specExtraction.VariableDeclaration; return sb.toString(); } + /** + boolean spaceSeparator(Token t) { + switch (t.image) { + case "[": + case "]": + case "=": + case "(": + case ")": + case ",": + case ".": + case "*": + case "~": + case "!": + case "&": + case "|": + case "%": + case "+": + case "-": + case "/": + case "<": + case ">": + case "<=": + case ">=": + case "==": + case "!=": + case "&&": + case "||": + case "^": + case "?": + case ":": + case "::": + case "<<": + case ">>": + case ">>>": + case "+=": + case "-=": + case "*=": + case "/=": + case "%=": + case "^=": + case "&=": + case ";": + return false; + default: + return true; + } + } + */ + } PARSER_END(SpecParser) @@ -334,6 +383,8 @@ SKIP : { | +| + | <#DIGIT: ["0"-"9"]> | @@ -675,12 +726,16 @@ VariableDeclaration TypeParam() : } } + + ArrayList C_CPP_CODE(ArrayList headers) : { String text; Token t; boolean newLine = false; + boolean newSpace = true; boolean inTemplate = false; + boolean inForLoop = false; ArrayList content; String header; } @@ -696,6 +751,7 @@ ArrayList C_CPP_CODE(ArrayList headers) : t = | t = | t = | t = | (t =