for (i = 2; i < real_size; i++) {
_data[i].store(NULL, memory_order_relaxed);
}
- _data[1].store(hashes, memory_order_release);
+ _data[1].store(hashes, memory_order_relaxed);
}
~kvs_data() {
_slots.store(0, memory_order_relaxed);
_copy_idx.store(0, memory_order_relaxed);
- _copy_done.store(0, memory_order_release);
+ _copy_done.store(0, memory_order_relaxed);
}
~CHM() {}
}
kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
- //model_print("resizing...\n");
+ model_print("resizing...\n");
kvs_data *newkvs = _newkvs.load(memory_order_acquire);
if (newkvs != NULL)
return newkvs;
// Last check cause the 'new' below is expensive
newkvs = _newkvs.load(memory_order_acquire);
+ model_print("hey1\n");
if (newkvs != NULL) return newkvs;
newkvs = new kvs_data(newsz);
void *chm = (void*) new CHM(sz);
- newkvs->_data[0].store(chm, memory_order_release);
+ model_print("hey2\n");
+ newkvs->_data[0].store(chm, memory_order_relaxed);
kvs_data *cur_newkvs;
// Another check after the slow allocation
// Just follow Cliff Click's code here
int panic_start = -1;
int copyidx;
- while (_copy_done.load(memory_order_acquire) < oldlen) {
- copyidx = _copy_idx.load(memory_order_acquire);
+ while (_copy_done.load(memory_order_relaxed) < oldlen) {
+ copyidx = _copy_idx.load(memory_order_relaxed);
if (panic_start == -1) { // No painc
- copyidx = _copy_idx.load(memory_order_acquire);
+ copyidx = _copy_idx.load(memory_order_relaxed);
while (copyidx < (oldlen << 1) &&
!_copy_idx.compare_exchange_strong(copyidx, copyidx +
min_copy_work, memory_order_release, memory_order_relaxed))
kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
*oldkvs, int idx, void *should_help) {
- kvs_data *newkvs = _newkvs.load(memory_order_acquire);
+ kvs_data *newkvs = _newkvs.load(memory_order_relaxed);
// We're only here cause the caller saw a Prime
if (copy_slot(topmap, idx, oldkvs, newkvs))
copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
// Promote the new table to the current table
if (copyDone + workdone == oldlen &&
- topmap->_kvs.load(memory_order_acquire) == oldkvs) {
- kvs_data *newkvs = _newkvs.load(memory_order_acquire);
+ topmap->_kvs.load(memory_order_relaxed) == oldkvs) {
+ kvs_data *newkvs = _newkvs.load(memory_order_relaxed);
topmap->_kvs.compare_exchange_strong(oldkvs, newkvs, memory_order_release,
memory_order_relaxed);
}
kvs_data *kvs = new kvs_data(Default_Init_Size);
void *chm = (void*) new CHM(0);
kvs->_data[0].store(chm, memory_order_relaxed);
- _kvs.store(kvs, memory_order_release);
+ _kvs.store(kvs, memory_order_relaxed);
}
cliffc_hashtable(int init_size) {
kvs_data *kvs = new kvs_data(init_size);
void *chm = (void*) new CHM(0);
kvs->_data[0].store(chm, memory_order_relaxed);
- _kvs.store(kvs, memory_order_release);
+ _kvs.store(kvs, memory_order_relaxed);
}
/**
@Commit_point_set: Get_Success_Point1 | Get_Success_Point2 | Get_Success_Point3
@ID: getKeyTag(key)
@Action:
- void *_Old_Val = spec_table_get(map, key);
+ TypeV *_Old_Val = (TypeV*) spec_table_get(map, key);
+ //bool passed = equals_val(_Old_Val, __RET__);
+ bool passed = false;
+ if (!passed) {
+ int old = _Old_Val == NULL ? 0 : _Old_Val->_val;
+ int ret = __RET__ == NULL ? 0 : __RET__->_val;
+ model_print("Get: key: %d, _Old_Val: %d, RET: %d\n",
+ key->_val, old, ret);
+ }
@Post_check:
- __RET__ == NULL ? true : equals_val(_Old_Val, __RET__)
+ //__RET__ == NULL ? true : equals_val(_Old_Val, __RET__)
+ equals_val(_Old_Val, __RET__)
@End
*/
TypeV* get(TypeK *key) {
slot *key_slot = new slot(false, key);
int fullhash = hash(key_slot);
- kvs_data *kvs = _kvs.load(memory_order_acquire);
+ kvs_data *kvs = _kvs.load(memory_order_relaxed);
slot *V = get_impl(this, kvs, key_slot, fullhash);
if (V == NULL) return NULL;
MODEL_ASSERT (!is_prime(V));
@ID: getKeyTag(key)
@Action:
# Remember this old value at checking point
- void *_Old_Val = spec_table_get(map, key);
+ TypeV *_Old_Val = (TypeV*) spec_table_get(map, key);
spec_table_put(map, key, val);
+ //bool passed = equals_val(__RET__, _Old_Val);
+ bool passed = false;
+ if (!passed) {
+ int old = _Old_Val == NULL ? 0 : _Old_Val->_val;
+ int ret = __RET__ == NULL ? 0 : __RET__->_val;
+ model_print("Put: key: %d, val: %d, _Old_Val: %d, RET: %d\n",
+ key->_val, val->_val, old, ret);
+ }
@Post_check:
equals_val(__RET__, _Old_Val)
@End
MODEL_ASSERT (idx >= 0 && idx < kvs->_size);
// Corresponding to the volatile read in get_impl() and putIfMatch in
// Cliff Click's Java implementation
- slot *res = (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
+ slot *res = (slot*) kvs->_data[idx * 2 + 2].load(memory_order_relaxed);
+ /**
+ @Begin
+ # This is a complicated potential commit point since many many functions are
+ # calling val().
+ @Potential_commit_point_define: true
+ @Label: Read_Key_Point
+ @End
+ */
return res;
}
// inserted keys
static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
- desired, memory_order_release, memory_order_relaxed);
+ desired, memory_order_relaxed, memory_order_relaxed);
}
/**
# If it is a successful put instead of a copy or any other internal
# operantions, expected != NULL
@Begin
- @Potential_commit_point_define: res == true
+ @Potential_commit_point_define: res
@Label: Write_Val_Point
@End
*/
int reprobe_cnt = 0;
while (true) {
slot *K = key(kvs, idx);
- slot *V = val(kvs, idx);
/**
@Begin
@Commit_point_define: K == NULL
- @Potential_commit_point_label: Read_Val_Point
+ @Potential_commit_point_label: Read_Key_Point
@Label: Get_Success_Point_1
@End
*/
+ slot *V = val(kvs, idx);
+
- if (K == NULL) return NULL; // A miss
+ if (K == NULL) {
+ model_print("Key is null\n");
+ return NULL; // A miss
+ }
if (keyeq(K, key_slot, hashes, idx, fullhash)) {
// Key hit! Check if table-resize in progress
if (++reprobe_cnt >= REPROBE_LIMIT ||
key_slot == TOMBSTONE) {
// Retry in new table
- // Atomic read (acquire) can be here
- kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
+ // Atomic read can be here
+ kvs_data *newkvs = chm->_newkvs.load(memory_order_relaxed);
/**
@Begin
@Commit_point_define_check: newkvs == NULL
slot *key_slot = new slot(false, key);
slot *value_slot = new slot(false, value);
- kvs_data *kvs = _kvs.load(memory_order_acquire);
+ kvs_data *kvs = _kvs.load(memory_order_relaxed);
slot *res = putIfMatch(this, kvs, key_slot, value_slot, old_val);
// Only when copy_slot() call putIfMatch() will it return NULL
MODEL_ASSERT (res != NULL);
// Here it tries to resize cause it doesn't want other threads to stop
// its progress (eagerly try to resize soon)
- newkvs = chm->_newkvs.load(memory_order_acquire);
+ newkvs = chm->_newkvs.load(memory_order_relaxed);
if (newkvs == NULL &&
((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V))) {
//model_print("resize2\n");
// Help along an existing table-resize. This is a fast cut-out wrapper.
kvs_data* help_copy(kvs_data *helper) {
- kvs_data *topkvs = _kvs.load(memory_order_acquire);
+ kvs_data *topkvs = _kvs.load(memory_order_relaxed);
CHM *topchm = get_chm(topkvs);
// No cpy in progress
- if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
+ if (topchm->_newkvs.load(memory_order_relaxed) == NULL) return helper;
topchm->help_copy_impl(this, topkvs, false);
return helper;
}
cliffc_hashtable<IntWrapper, IntWrapper> *table;
IntWrapper *val1, *val2;
-IntWrapper *k1, *k2, *k3, *k4, *k5;
-IntWrapper *v1, *v2, *v3, *v4, *v5;
+IntWrapper *k0, *k1, *k2, *k3, *k4, *k5;
+IntWrapper *v0, *v1, *v2, *v3, *v4, *v5;
void threadA(void *arg) {
table->put(k1, v1);
if (val1 != NULL)
model_print("val1: %d\n", val1->_val);
else
- model_print("val1: NULL\n");
- */
+ model_print("val1: NULL\n");*/
//table->put(k3, v3);
}
void threadB(void *arg) {
+ table->put(k1, v1);
+ table->put(k2, v4);
+ table->put(k3, v3);
}
void threadMain(void *arg) {
- //table->put(k3, v3);
- val2 = table->get(k1);
+ val1 = table->get(k1);
+ val2 = table->get(k2);
+ if (val1 != NULL)
+ model_print("val1: %d\n", val1->_val);
+ else
+ model_print("val1: NULL\n");
if (val2 != NULL)
- model_print("val2: %d\n", val1->_val);
+ model_print("val2: %d\n", val2->_val);
else
model_print("val2: NULL\n");
}
int user_main(int argc, char *argv[]) {
thrd_t t1, t2;
- table = new cliffc_hashtable<IntWrapper, IntWrapper>(2);
+ table = new cliffc_hashtable<IntWrapper, IntWrapper>(16);
k1 = new IntWrapper(3);
k2 = new IntWrapper(5);
k3 = new IntWrapper(11);
v4 = new IntWrapper(81);
v5 = new IntWrapper(99);
+ v0 = new IntWrapper(2048);
+ table->put(k1, v0);
+ table->put(k2, v0);
+ model_print("hey\n");
thrd_create(&t1, threadA, NULL);
thrd_create(&t2, threadB, NULL);
threadMain(NULL);
{
int32_t *bin = queue->write_prepare();
store_32(bin, 1);
- *bin = 1;
printf("write_bin %d, val %d\n", bin, 1);
- queue->write_publish(bin);
+ queue->write_publish();
}
void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
while (bin = queue->read_fetch()) {
printf("Read: %d\n", load_32(bin));
printf("read_bin %d, val %d\n", bin, load_32(bin));
- printf("Read: %d\n", *bin);
- queue->read_consume(bin);
+ queue->read_consume();
}
}
{
int32_t *bin = queue->write_prepare();
store_32(bin, 1);
- *bin = 1;
- queue->write_publish(bin);
+ queue->write_publish();
while (bin = queue->read_fetch()) {
printf("Read: %d\n", load_32(bin));
- printf("Read: %d\n", *bin);
- queue->read_consume(bin);
+ queue->read_consume();
}
}
int user_main(int argc, char **argv)
{
- struct mpmc_boundq_1_alt<int32_t, 2> queue;
+ struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
/* Note: optarg() / optind is broken in model-checker - workaround is
printf("Adding initial element\n");
int32_t *bin = queue.write_prepare();
store_32(bin, 17);
- *bin, 17;
printf("init_write_bin %d, val %d\n", bin, 17);
- queue.write_publish(bin);
+ queue.write_publish();
#endif
printf("Start threads\n");
#include <unrelacy.h>
#include <common.h>
-#include <spec_lib.h>
-#include <stdlib.h>
-#include <cdsannotate.h>
-#include <specannotation.h>
-#include <model_memory.h>
-
/**
@Begin
@Class_begin
} elem;
@DeclareVar:
spec_list *list;
- //id_tag_t *tag;
+ id_tag_t *tag;
@InitVar:
list = new_spec_list();
- //tag = new_id_tag();
+ tag = new_id_tag();
+ @DefineFunc:
+ elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
+ elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
+ e->pos = pos;
+ e->written = false;
+ e->id = id;
+ e->tid = tid;
+ e->fetch_tid = -1;
+ }
+ @DefineFunc:
+ elem* get_elem_by_pos(t_element *pos) {
+ for (int i = 0; i < size(list); i++) {
+ elem *e = (elem*) elem_at_index(list, i);
+ if (e->pos == pos) {
+ return e;
+ }
+ }
+ return NULL;
+ }
+ @DefineFunc:
+ void show_list() {
+ //model_print("Status:\n");
+ for (int i = 0; i < size(list); i++) {
+ elem *e = (elem*) elem_at_index(list, i);
+ //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id);
+ }
+ }
+ @DefineFunc:
+ elem* get_elem_by_tid(thread_id_t tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *e = (elem*) elem_at_index(list, i);
+ if (e->tid== tid) {
+ return e;
+ }
+ }
+ return NULL;
+ }
+ @DefineFunc:
+ elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *e = (elem*) elem_at_index(list, i);
+ if (e->fetch_tid== fetch_tid) {
+ return e;
+ }
+ }
+ return NULL;
+ }
+ @DefineFunc:
+ int elem_idx_by_pos(t_element *pos) {
+ for (int i = 0; i < size(list); i++) {
+ elem *existing = (elem*) elem_at_index(list, i);
+ if (pos == existing->pos) {
+ return i;
+ }
+ }
+ return -1;
+ }
+ @DefineFunc:
+ int elem_idx_by_tid(thread_id_t tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *existing = (elem*) elem_at_index(list, i);
+ if (tid == existing->tid) {
+ return i;
+ }
+ }
+ return -1;
+ }
+ @DefineFunc:
+ int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *existing = (elem*) elem_at_index(list, i);
+ if (fetch_tid == existing->fetch_tid) {
+ return i;
+ }
+ }
+ return -1;
+ }
+ @DefineFunc:
+ int elem_num(t_element *pos) {
+ int cnt = 0;
+ for (int i = 0; i < size(list); i++) {
+ elem *existing = (elem*) elem_at_index(list, i);
+ if (pos == existing->pos) {
+ cnt++;
+ }
+ }
+ return cnt;
+ }
+ @DefineFunc:
+ call_id_t prepare_id() {
+ call_id_t res = get_and_inc(tag);
+ //model_print("prepare_id: %d\n", res);
+ return res;
+ }
+ @DefineFunc:
+ bool prepare_check(t_element *pos, thread_id_t tid) {
+ show_list();
+ elem *e = get_elem_by_pos(pos);
+ //model_print("prepare_check: e %d\n", e);
+ return NULL == e;
+ }
+ @DefineFunc:
+ void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
+ //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
+ elem *e = new_elem(pos, id, tid);
+ push_back(list, e);
+ }
+ @DefineFunc:
+ call_id_t publish_id(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ bool publish_check(thread_id_t tid) {
+ show_list();
+ elem *e = get_elem_by_tid(tid);
+ //model_print("publish_check: tid %d\n", tid);
+ if (NULL == e)
+ return false;
+ if (elem_num(e->pos) > 1)
+ return false;
+ return !e->written;
+ }
+ @DefineFunc:
+ void publish(thread_id_t tid) {
+ //model_print("publish: tid %d\n", tid);
+ elem *e = get_elem_by_tid(tid);
+ e->written = true;
+ }
+ @DefineFunc:
+ call_id_t fetch_id(t_element *pos) {
+ elem *e = get_elem_by_pos(pos);
+ //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ bool fetch_check(t_element *pos) {
+ show_list();
+ if (pos == NULL) return true;
+ elem *e = get_elem_by_pos(pos);
+ //model_print("fetch_check: pos %d, e %d\n", pos, e);
+ if (e == NULL) return false;
+ if (elem_num(e->pos) > 1)
+ return false;
+ return true;
+ }
+ @DefineFunc:
+ void fetch(t_element *pos, thread_id_t tid) {
+ if (pos == NULL) return;
+ elem *e = (elem*) get_elem_by_pos(pos);
+ //model_print("fetch: pos %d, tid %d\n", pos, tid);
+ // Remember the thread that fetches the position
+ e->fetch_tid = tid;
+ }
+ @DefineFunc:
+ bool consume_check(thread_id_t tid) {
+ show_list();
+ elem *e = get_elem_by_fetch_tid(tid);
+ //model_print("consume_check: tid %d, e %d\n", tid, e);
+ if (NULL == e)
+ return false;
+ if (elem_num(e->pos) > 1)
+ return false;
+ return e->written;
+ }
+ @DefineFunc:
+ call_id_t consume_id(thread_id_t tid) {
+ elem *e = get_elem_by_fetch_tid(tid);
+ //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ void consume(thread_id_t tid) {
+ //model_print("consume: tid %d\n", tid);
+ int idx = elem_idx_by_fetch_tid(tid);
+ if (idx == -1)
+ return;
+ remove_at_index(list, idx);
+ }
@Happens_before:
- Publish -> Fetch
- Consume -> Prepare
+ Prepare -> Fetch
+ Publish -> Consume
@End
*/
@Begin
@Interface: Fetch
@Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
- @ID: (call_id_t) __RET__
- //@Check:
- //__RET__ == NULL || has_elem(list, __RET__)
+ @ID: fetch_id(__RET__)
+ @Check:
+ fetch_check(__RET__)
+ @Action:
+ fetch(__RET__, __TID__);
@End
*/
t_element * read_fetch() {
- // Try this new weaker semantics
unsigned int rdwr = m_rdwr.load(mo_acquire);
- //unsigned int rdwr = m_rdwr.load(mo_relaxed);
/**
@Begin
@Potential_commit_point_define: true
wr = rdwr & 0xFFFF;
if ( wr == rd ) { // empty
-
/**
@Begin
@Commit_point_define: true
@Label: Fetch_Empty_Point
@End
*/
-
return false;
}
@Begin
@Interface: Consume
@Commit_point_set: Consume_Point
- @ID: (call_id_t) bin
- //@Check:
- // consume_check(__TID__)
- //@Action:
- //consume(__TID__);
+ @ID: consume_id(__TID__)
+ @Check:
+ consume_check(__TID__)
+ @Action:
+ consume(__TID__);
@End
*/
- void read_consume(t_element *bin) {
- /**** FIXME: miss ****/
+ void read_consume() {
m_read.fetch_add(1,mo_release);
/**
@Begin
@Begin
@Interface: Prepare
@Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
- @ID: (call_id_t) __RET__
- //@Check:
- //prepare_check(__RET__, __TID__)
- //@Action:
- //push_back(list, __RET__);
+ @ID: prepare_id()
+ @Check:
+ prepare_check(__RET__, __TID__)
+ @Action:
+ prepare(__ID__, __RET__, __TID__);
@End
*/
t_element * write_prepare() {
- // Try weaker semantics
unsigned int rdwr = m_rdwr.load(mo_acquire);
- //unsigned int rdwr = m_rdwr.load(mo_relaxed);
/**
@Begin
@Potential_commit_point_define: true
wr = rdwr & 0xFFFF;
if ( wr == ((rd + t_size)&0xFFFF) ) { // full
-
/**
@Begin
@Commit_point_define: true
@Begin
@Interface: Publish
@Commit_point_set: Publish_Point
- @ID: (call_id_t) bin
- //@Check:
- //publish_check(__TID__)
- //@Action:
- //publish(__TID__);
+ @ID: publish_id(__TID__)
+ @Check:
+ publish_check(__TID__)
+ @Action:
+ publish(__TID__);
@End
*/
- void write_publish(t_element *bin)
+ void write_publish()
{
- /**** hb violation ****/
m_written.fetch_add(1,mo_release);
/**
@Begin
if (!pid) {
input[0] = 17;
- enqueue(queue, input[0]);
+ //enqueue(queue, input[0]);
enqueue(queue, input[0]);
output[0] = dequeue(queue);
} else {
input[1] = 37;
enqueue(queue, input[1]);
//output[1] = dequeue(queue);
- output[0] = dequeue(queue);
+ //output[0] = dequeue(queue);
output[0] = dequeue(queue);
}
}