void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
{
int32_t *bin = queue->write_prepare();
- store_32(bin, 1);
+ //store_32(bin, 1);
*bin = 1;
printf("write_bin %d, val %d\n", bin, 1);
queue->write_publish(bin);
{
int32_t *bin;
while (bin = queue->read_fetch()) {
- printf("Read: %d\n", load_32(bin));
- printf("read_bin %d, val %d\n", bin, load_32(bin));
+ //printf("Read: %d\n", load_32(bin));
+ //printf("read_bin %d, val %d\n", bin, load_32(bin));
printf("Read: %d\n", *bin);
queue->read_consume(bin);
}
void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
{
int32_t *bin = queue->write_prepare();
- store_32(bin, 1);
+ //store_32(bin, 1);
*bin = 1;
queue->write_publish(bin);
while (bin = queue->read_fetch()) {
- printf("Read: %d\n", load_32(bin));
+ //printf("Read: %d\n", load_32(bin));
printf("Read: %d\n", *bin);
queue->read_consume(bin);
}
#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
printf("Adding initial element\n");
int32_t *bin = queue.write_prepare();
- store_32(bin, 17);
+ //store_32(bin, 17);
*bin, 17;
printf("init_write_bin %d, val %d\n", bin, 17);
queue.write_publish(bin);
/**
@Begin
@Interface: Fetch
- @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
+ @Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load
@ID: (call_id_t) __RET__
//@Check:
//__RET__ == NULL || has_elem(list, __RET__)
/**
@Begin
@Potential_commit_point_define: true
- @Label: Fetch_Potential_Point
+ @Label: Fetch_Potential_RW_Load
@End
*/
unsigned int rd,wr;
/**
@Begin
@Commit_point_define: true
- @Potential_commit_point_label: Fetch_Potential_Point
- @Label: Fetch_Empty_Point
+ @Potential_commit_point_label: Fetch_Potential_RW_Load
+ @Label: Fetch_RW_Load_Empty
@End
*/
bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
/**
@Begin
- @Commit_point_define_check: succ == true
- @Label: Fetch_Succ_Point
+ @Commit_point_define_check: succ
+ @Label: Fetch_RW_RMW
@End
*/
if (succ)
// (*1)
rl::backoff bo;
- while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
- thrd_yield();
+ while (true) {
+ int written = m_written.load(mo_acquire);
+ /**
+ @Begin
+ @Potential_commit_point_define: true
+ @Label: Fetch_Potential_W_Load
+ @End
+ */
+ if ((written & 0xFFFF) != wr) {
+ thrd_yield();
+ } else {
+ break;
+ }
}
+
+ /**
+ @Begin
+ @Commit_point_define: true
+ @Potential_commit_point_label: Fetch_Potential_W_Load
+ @Label: Fetch_W_Load
+ @End
+ */
t_element * p = & ( m_array[ rd % t_size ] );
/**
@Begin
@Interface: Consume
- @Commit_point_set: Consume_Point
+ @Commit_point_set: Consume_R_RMW
@ID: (call_id_t) bin
//@Check:
// consume_check(__TID__)
/**
@Begin
@Commit_point_define_check: true
- @Label: Consume_Point
+ @Label: Consume_R_RMW
@End
*/
}
/**
@Begin
@Interface: Prepare
- @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
+ @Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load
@ID: (call_id_t) __RET__
//@Check:
//prepare_check(__RET__, __TID__)
/**
@Begin
@Potential_commit_point_define: true
- @Label: Prepare_Potential_Point
+ @Label: Prepare_Potential_RW_Load
@End
*/
unsigned int rd,wr;
/**
@Begin
@Commit_point_define: true
- @Potential_commit_point_label: Prepare_Potential_Point
- @Label: Prepare_Full_Point
+ @Potential_commit_point_label: Prepare_Potential_RW_Load
+ @Label: Prepare_RW_Load_Full
@End
*/
return NULL;
((wr+1)&0xFFFF),mo_acq_rel);
/**
@Begin
- @Commit_point_define_check: succ == true
- @Label: Prepare_Succ_Point
+ @Commit_point_define_check: succ
+ @Label: Prepare_RW_RMW
@End
*/
if (succ)
// (*1)
rl::backoff bo;
- while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
- thrd_yield();
+ while (true) {
+ int read = m_read.load(mo_acquire);
+ /**
+ @Begin
+ @Potential_commit_point_define: true
+ @Label: Prepare_Potential_R_Load
+ @End
+ */
+ if ((read & 0xFFFF) != rd)
+ thrd_yield();
+ else
+ break;
}
+ /**
+ @Begin
+ @Commit_point_define: true
+ @Potential_commit_point_label: Prepare_Potential_R_Load
+ @Label: Prepare_R_Load
+ @End
+ */
+
t_element * p = & ( m_array[ wr % t_size ] );
return p;
/**
@Begin
@Interface: Publish
- @Commit_point_set: Publish_Point
+ @Commit_point_set: Publish_W_RMW
@ID: (call_id_t) bin
//@Check:
//publish_check(__TID__)
/**
@Begin
@Commit_point_define_check: true
- @Label: Publish_Point
+ @Label: Publish_W_RMW
@End
*/
}
--- /dev/null
+#1: The new bug we found in this data structure is as the following:
+
+In dequeue(), the load of Tail has to be acquire. If not, the load of the next
+field won't get synchronize with the enqueuer that inserted the element right
+after this dequeue() operation. We can catch this bug using the two independent
+dequeuers and 1 (or 2) enqueuer.
+
+#2: When writing the speicification for this data structure, we have been
+considering the following invariance of the enqueue() and dequeue() operations.
+For enqueue() to succeed, it finally loads the Tail, inserts the new element by
+updating the next field right after the Tail load. The load of Tail actually
+orders the enqueuers, and the CAS of the next field will order the enqueuer and
+its dequeuer. For dequeue() to succeed, it either loads the Head and Tail, and
+then bails; or it loads the Head and then loads the next field. Similarly, the
+load of Head orders dequeuers.
+
+We have one key "additional_ordering_point" in enqueue() method for ordering the
+dequeuer and its immediately next enqueuer. For example, when we have two
+threads, 1 dequeuer and 1 enqueuer, when dequeue() returns NULL, we can order
+dequeue() before enqueue() by using the additional ordering point of enqueue()
+where the swing of Tail (or just the load of Tail when someone else swings it).
+Since dequeue() will load the Tail, and enqueue() will CAS or load the Tail
+later, deuqeue() can be ordered() before enqueue(). Notably here, additional
+ordering points are only used when methods has 'same-location' commit points and
+they can't order each other.
// File[][] sources = { srcLinuxRWLocks, srcMSQueue, srcRCU,
// srcDeque, srcMCSLock, srcSPSCQueue, srcMPMCQueue, srcHashtable };
- File[][] sources = {srcMSQueue, srcHashtable };
+ File[][] sources = {srcMSQueue, srcMPMCQueue };
// Compile all the benchmarks
for (int i = 0; i < sources.length; i++) {
CodeGenerator gen = new CodeGenerator(sources[i]);