fixed mpmc spec
[cdsspec-compiler.git] / benchmark / cliffc-hashtable / cliffc_hashtable.h
1 #ifndef CLIFFC_HASHTABLE_H
2 #define CLIFFC_HASHTABLE_H
3
4 #include <iostream>
5 #include <atomic>
6 #include <common.h>
7 #include <model-assert.h>
8
9 using namespace std;
10
11 /**
12         This header file declares and defines a simplified version of Cliff Click's
13         NonblockingHashMap. It contains all the necessary structrues and main
14         functions. In simplified_cliffc_hashtable.cc file, it has the definition for
15         the static fields.
16 */
17
18 template<typename TypeK, typename TypeV>
19 class cliffc_hashtable;
20
21 /**
22         Corresponding the the Object[] array in Cliff Click's Java implementation.
23         It keeps the first two slots for CHM (Hashtable control unit) and the hash
24         records (an array of hash used for fast negative key-equality check).
25 */
26 struct kvs_data {
27         int _size;
28         atomic<void*> *_data;
29         
30         kvs_data(int sz) {
31                 _size = sz;
32                 int real_size = sz * 2 + 2;
33                 _data = new atomic<void*>[real_size];
34                 // The control block should be initialized in resize()
35                 // Init the hash record array
36                 int *hashes = new int[_size];
37                 int i;
38                 for (i = 0; i < _size; i++) {
39                         hashes[i] = 0;
40                 }
41                 // Init the data to Null slot
42                 for (i = 2; i < real_size; i++) {
43                         _data[i].store(NULL, memory_order_relaxed);
44                 }
45                 _data[1].store(hashes, memory_order_release);
46         }
47
48         ~kvs_data() {
49                 int *hashes = (int*) _data[1].load(memory_order_relaxed);
50                 delete hashes;
51                 delete[] _data;
52         }
53 };
54
55 struct slot {
56         bool _prime;
57         void *_ptr;
58
59         slot(bool prime, void *ptr) {
60                 _prime = prime;
61                 _ptr = ptr;
62         }
63 };
64
65
66 /**
67         TypeK must have defined function "int hashCode()" which return the hash
68         code for the its object, and "int equals(TypeK anotherKey)" which is
69         used to judge equality.
70         TypeK and TypeV should define their own copy constructor.
71 */
72 template<typename TypeK, typename TypeV>
73 class cliffc_hashtable {
74         /**
75                 # The synchronization we have for the hashtable gives us the property of
76                 # serializability, so we should have a sequential hashtable when we check the
77                 # correctness. The key thing is to identify all the commit point.
78         
79                 @Begin
80                 @Options:
81                         LANG = C;
82                         CLASS = cliffc_hashtable;
83                 @Global_define:
84                         @DeclareVar:
85                         spec_hashtable<TypeK, TypeV*> map;
86                         spec_hashtable<TypeK, Tag> id_map;
87                         Tag tag;
88                         @InitVar:
89                                 map = spec_hashtable<TypeK, TypeV*>();
90                                 id_map = spec_hashtable<TypeK, TypeV*>();
91                                 tag = Tag();
92                         @DefineFunc:
93                         bool equals_val(TypeV *ptr1, TypeV *ptr2) {
94                                 // ...
95                         }
96                         
97                         @DefineFunc:
98                         # Update the tag for the current key slot if the corresponding tag
99                         # is NULL, otherwise just return that tag. It will update the next
100                         # available tag too if it requires a new tag for that key slot.
101                         Tag getKeyTag(TypeK &key) {
102                                 if (id_map.get(key) == NULL) {
103                                         Tag cur_tag = tag.current();
104                                         id_map.put(key, cur_tag);
105                                         tag.next();
106                                         return cur_tag;
107                                 } else {
108                                         return id_map.get(key);
109                                 }
110                         }
111                 
112                 @Interface_cluster:
113                         Read_interface = {
114                                 Get,
115                                 PutIfAbsent,
116                                 RemoveAny,
117                                 RemoveIfMatch,
118                                 ReplaceAny,
119                                 ReplaceIfMatch
120                         }
121                         
122                         Write_interface = {
123                                 Put,
124                                 PutIfAbsent(COND_PutIfAbsentSucc),
125                                 RemoveAny,
126                                 RemoveIfMatch(COND_RemoveIfMatchSucc),
127                                 ReplaceAny,
128                                 ReplaceIfMatch(COND_ReplaceIfMatchSucc)
129                         }
130                 @Happens_before:
131                         Write_interface -> Read_interface
132                 @End
133         */
134
135 friend class CHM;
136         /**
137                 The control structure for the hashtable
138         */
139         private:
140         class CHM {
141                 friend class cliffc_hashtable;
142                 private:
143                 atomic<kvs_data*> _newkvs;
144                 
145                 // Size of active K,V pairs
146                 atomic_int _size;
147         
148                 // Count of used slots
149                 atomic_int _slots;
150                 
151                 // The next part of the table to copy
152                 atomic_int _copy_idx;
153                 
154                 // Work-done reporting
155                 atomic_int _copy_done;
156         
157                 public:
158                 CHM(int size) {
159                         _newkvs.store(NULL, memory_order_relaxed);
160                         _size.store(size, memory_order_relaxed);
161                         _slots.store(0, memory_order_relaxed);
162         
163                         _copy_idx.store(0, memory_order_relaxed);
164                         _copy_done.store(0, memory_order_release);
165                 }
166         
167                 ~CHM() {}
168                 
169                 private:
170                         
171                 // Heuristic to decide if the table is too full
172                 bool table_full(int reprobe_cnt, int len) {
173                         return
174                                 reprobe_cnt >= REPROBE_LIMIT &&
175                                 _slots.load(memory_order_relaxed) >= reprobe_limit(len);
176                 }
177         
178                 kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
179                         //model_print("resizing...\n");
180                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
181                         if (newkvs != NULL)
182                                 return newkvs;
183         
184                         // No copy in-progress, start one; Only double the table size
185                         int oldlen = kvs->_size;
186                         int sz = _size.load(memory_order_relaxed);
187                         int newsz = sz;
188                         
189                         // Just follow Cliff Click's heuristic to decide the new size
190                         if (sz >= (oldlen >> 2)) { // If we are 25% full
191                                 newsz = oldlen << 1; // Double size
192                                 if (sz >= (oldlen >> 1))
193                                         newsz = oldlen << 2; // Double double size
194                         }
195         
196                         // We do not record the record timestamp
197                         if (newsz <= oldlen) newsz = oldlen << 1;
198                         // Do not shrink ever
199                         if (newsz < oldlen) newsz = oldlen;
200         
201                         // Last check cause the 'new' below is expensive
202                         newkvs = _newkvs.load(memory_order_acquire);
203                         if (newkvs != NULL) return newkvs;
204         
205                         newkvs = new kvs_data(newsz);
206                         void *chm = (void*) new CHM(sz);
207                         newkvs->_data[0].store(chm, memory_order_release);
208         
209                         kvs_data *cur_newkvs; 
210                         // Another check after the slow allocation
211                         if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
212                                 return cur_newkvs;
213                         // CAS the _newkvs to the allocated table
214                         kvs_data *desired = (kvs_data*) NULL;
215                         kvs_data *expected = (kvs_data*) newkvs; 
216                         if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
217                                         memory_order_release)) {
218                                 // Should clean the allocated area
219                                 delete newkvs;
220                                 newkvs = _newkvs.load(memory_order_acquire);
221                         }
222                         return newkvs;
223                 }
224         
225                 void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
226                         bool copy_all) {
227                         MODEL_ASSERT (get_chm(oldkvs) == this);
228                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
229                         int oldlen = oldkvs->_size;
230                         int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
231                 
232                         // Just follow Cliff Click's code here
233                         int panic_start = -1;
234                         int copyidx;
235                         while (_copy_done.load(memory_order_acquire) < oldlen) {
236                                 copyidx = _copy_idx.load(memory_order_acquire);
237                                 if (panic_start == -1) { // No painc
238                                         copyidx = _copy_idx.load(memory_order_acquire);
239                                         while (copyidx < (oldlen << 1) &&
240                                                 !_copy_idx.compare_exchange_strong(copyidx, copyidx +
241                                                         min_copy_work, memory_order_release, memory_order_release))
242                                                 copyidx = _copy_idx.load(memory_order_relaxed);
243                                         if (!(copyidx < (oldlen << 1)))
244                                                 panic_start = copyidx;
245                                 }
246         
247                                 // Now copy the chunk of work we claimed
248                                 int workdone = 0;
249                                 for (int i = 0; i < min_copy_work; i++)
250                                         if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
251                                                 newkvs))
252                                                 workdone++;
253                                 if (workdone > 0)
254                                         copy_check_and_promote(topmap, oldkvs, workdone);
255         
256                                 copyidx += min_copy_work;
257                                 if (!copy_all && panic_start == -1)
258                                         return; // We are done with the work we claim
259                         }
260                         copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
261                 }
262         
263                 kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
264                         *oldkvs, int idx, void *should_help) {
265                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
266                         // We're only here cause the caller saw a Prime
267                         if (copy_slot(topmap, idx, oldkvs, newkvs))
268                                 copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
269                         return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
270                 }
271         
272                 void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
273                         oldkvs, int workdone) {
274                         int oldlen = oldkvs->_size;
275                         int copyDone = _copy_done.load(memory_order_relaxed);
276                         if (workdone > 0) {
277                                 while (true) {
278                                         copyDone = _copy_done.load(memory_order_relaxed);
279                                         if (_copy_done.compare_exchange_weak(copyDone, copyDone +
280                                                 workdone, memory_order_relaxed, memory_order_relaxed))
281                                                 break;
282                                 }
283                         }
284         
285                         // Promote the new table to the current table
286                         if (copyDone + workdone == oldlen &&
287                                 topmap->_kvs.load(memory_order_acquire) == oldkvs) {
288                                 kvs_data *newkvs = _newkvs.load(memory_order_acquire);
289                                 topmap->_kvs.compare_exchange_strong(oldkvs, newkvs, memory_order_release,
290                                         memory_order_release);
291                         }
292                 }
293         
294                 bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
295                         kvs_data *newkvs) {
296                         slot *key_slot;
297                         while ((key_slot = key(oldkvs, idx)) == NULL)
298                                 CAS_key(oldkvs, idx, NULL, TOMBSTONE);
299         
300                         // First CAS old to Prime
301                         slot *oldval = val(oldkvs, idx);
302                         while (!is_prime(oldval)) {
303                                 slot *box = (oldval == NULL || oldval == TOMBSTONE)
304                                         ? TOMBPRIME : new slot(true, oldval->_ptr);
305                                 if (CAS_val(oldkvs, idx, oldval, box)) {
306                                         if (box == TOMBPRIME)
307                                                 return 1; // Copy done
308                                         // Otherwise we CAS'd the box
309                                         oldval = box; // Record updated oldval
310                                         break;
311                                 }
312                                 oldval = val(oldkvs, idx); // Else re-try
313                         }
314         
315                         if (oldval == TOMBPRIME) return false; // Copy already completed here
316         
317                         slot *old_unboxed = new slot(false, oldval->_ptr);
318                         int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
319                                 NULL) == NULL);
320         
321                         // Old value is exposed in the new table
322                         while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
323                                 oldval = val(oldkvs, idx);
324         
325                         return copied_into_new;
326                 }
327         };
328
329         
330
331         private:
332         static const int Default_Init_Size = 4; // Intial table size
333
334         static slot* const MATCH_ANY;
335         static slot* const NO_MATCH_OLD;
336
337         static slot* const TOMBPRIME;
338         static slot* const TOMBSTONE;
339
340         static const int REPROBE_LIMIT = 10; // Forces a table-resize
341
342         atomic<kvs_data*> _kvs;
343
344         public:
345         cliffc_hashtable() {
346                 // Should initialize the CHM for the construction of the table
347                 // For other CHM in kvs_data, they should be initialzed in resize()
348                 // because the size is determined dynamically
349                 kvs_data *kvs = new kvs_data(Default_Init_Size);
350                 void *chm = (void*) new CHM(0);
351                 kvs->_data[0].store(chm, memory_order_relaxed);
352                 _kvs.store(kvs, memory_order_release);
353         }
354
355         cliffc_hashtable(int init_size) {
356                 // Should initialize the CHM for the construction of the table
357                 // For other CHM in kvs_data, they should be initialzed in resize()
358                 // because the size is determined dynamically
359                 kvs_data *kvs = new kvs_data(init_size);
360                 void *chm = (void*) new CHM(0);
361                 kvs->_data[0].store(chm, memory_order_relaxed);
362                 _kvs.store(kvs, memory_order_release);
363         }
364
365         /**
366                 @Begin
367                 @Interface: Get
368                 @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
369                 @ID: __sequential.getKeyTag(key)
370                 @Action:
371                         TypeV *_Old_Val = __sequential.map.get(key)
372                 @Post_check:
373                         __sequential.equals_val(_Old_Val, __RET__)
374                 @End
375         */
376         TypeV* get(TypeK& key) {
377                 void *key_ptr = (void*) new TypeK(key);
378                 slot *key_slot = new slot(false, key_ptr);
379                 int fullhash = hash(key_slot);
380                 kvs_data *kvs = _kvs.load(memory_order_acquire);
381                 slot *V = get_impl(this, kvs, key_slot, fullhash);
382                 if (V == NULL) return NULL;
383                 MODEL_ASSERT (!is_prime(V));
384                 return (TypeV*) V->_ptr;
385         }
386
387         /**
388                 @Begin
389                 @Interface: Put
390                 @Commit_point_set: Write_Val_Point
391                 @ID: __sequential.getKeyTag(key)
392                 @Action:
393                         # Remember this old value at checking point
394                         TypeV *_Old_Val = __sequential.map.get(key)
395                         __sequential.map.put(key, &val);
396                 @Post_check:
397                         __sequential.equals_val(__RET__, _Old_Val)
398                 @End
399         */
400         TypeV* put(TypeK& key, TypeV& val) {
401                 return putIfMatch(key, val, NO_MATCH_OLD);
402         }
403
404         /**
405                 @Begin
406                 @Interface: PutIfAbsent
407                 @Commit_point_set:
408                         Write_Val_Point | PutIfAbsent_Fail_Point
409                 @Condition: __sequential.map.get(key) == NULL
410                 @HB_condition:
411                         COND_PutIfAbsentSucc :: __RET__ == NULL
412                 @ID: __sequential.getKeyTag(key)
413                 @Action:
414                         TypeV *_Old_Val = __sequential.map.get(key)
415                         if (__COND_SAT__)
416                                 __sequential.map.put(key, &value);
417                 @Post_check:
418                         __COND_SAT__ ? __RET__ == NULL : __sequential.equals_val(_Old_Val, __RET__) 
419                 @End
420         */
421         TypeV* putIfAbsent(TypeK& key, TypeV& value) {
422                 return putIfMatch(key, val, TOMBSTONE);
423         }
424
425         /**
426                 @Begin
427                 @Interface: RemoveAny
428                 @Commit_point_set: Write_Val_Point
429                 @ID: __sequential.getKeyTag(key)
430                 @Action:
431                         TypeV *_Old_Val = __sequential.map.get(key)
432                         __sequential.map.put(key, NULL);
433                 @Post_check:
434                         __sequential.equals_val(__RET__, _Old_Val)
435                 @End
436         */
437         TypeV* remove(TypeK& key) {
438                 return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
439         }
440
441         /**
442                 @Begin
443                 @Interface: RemoveIfMatch
444                 @Commit_point_set:
445                         Write_Val_Point | RemoveIfMatch_Fail_Point
446                 @Condition:
447                         __sequential.equals_val(__sequential.map.get(key), &val)
448                 @HB_condition:
449                         COND_RemoveIfMatchSucc :: __RET__ == true
450                 @ID: __sequential.getKeyTag(key)
451                 @Action:
452                         if (__COND_SAT__)
453                                 __sequential.map.put(key, NULL);
454                 @Post_check:
455                         __COND_SAT__ ? __RET__ : !__RET__
456                 @End
457         */
458         bool remove(TypeK& key, TypeV& val) {
459                 slot *val_slot = val == NULL ? NULL : new slot(false, val);
460                 return putIfMatch(key, TOMBSTONE, val) == val;
461
462         }
463
464         /**
465                 @Begin
466                 @Interface: ReplaceAny
467                 @Commit_point_set:
468                         Write_Val_Point
469                 @ID: __sequential.getKeyTag(key)
470                 @Action:
471                         TypeV *_Old_Val = __sequential.map.get(key)
472                 @Post_check:
473                         __sequential.equals_val(__RET__, _Old_Val)
474                 @End
475         */
476         TypeV* replace(TypeK& key, TypeV& val) {
477                 return putIfMatch(key, val, MATCH_ANY);
478         }
479
480         /**
481                 @Begin
482                 @Interface: ReplaceIfMatch
483                 @Commit_point_set:
484                         Write_Val_Point | ReplaceIfMatch_Fail_Point
485                 @Condition:
486                         __sequential.equals_val(__sequential.map.get(key), &oldval)
487                 @HB_condition:
488                         COND_ReplaceIfMatchSucc :: __RET__ == true
489                 @ID: __sequential.getKeyTag(key)
490                 @Action:
491                         if (__COND_SAT__)
492                                 __sequential.map.put(key, &newval);
493                 @Post_check:
494                         __COND_SAT__ ? __RET__ : !__RET__
495                 @End
496         */
497         bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
498                 return putIfMatch(key, newval, oldval) == oldval;
499         }
500
501         private:
502         static CHM* get_chm(kvs_data* kvs) {
503                 CHM *res = (CHM*) kvs->_data[0].load(memory_order_relaxed);
504                 return res;
505         }
506
507         static int* get_hashes(kvs_data *kvs) {
508                 return (int *) kvs->_data[1].load(memory_order_relaxed);
509         }
510         
511         // Preserve happens-before semantics on newly inserted keys
512         static inline slot* key(kvs_data *kvs, int idx) {
513                 MODEL_ASSERT (idx >= 0 && idx < kvs->_size);
514                 // Corresponding to the volatile read in get_impl() and putIfMatch in
515                 // Cliff Click's Java implementation
516                 slot *res = (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
517                 return res;
518         }
519
520         /**
521                 The atomic operation in val() function is a "potential" commit point,
522                 which means in some case it is a real commit point while it is not for
523                 some other cases. This so happens because the val() function is such a
524                 fundamental function that many internal operation will call. Our
525                 strategy is that we label any potential commit points and check if they
526                 really are the commit points later.
527         */
528         // Preserve happens-before semantics on newly inserted values
529         static inline slot* val(kvs_data *kvs, int idx) {
530                 MODEL_ASSERT (idx >= 0 && idx < kvs->_size);
531                 // Corresponding to the volatile read in get_impl() and putIfMatch in
532                 // Cliff Click's Java implementation
533                 slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
534                 /**
535                         @Begin
536                         # This is a complicated potential commit point since many many functions are
537                         # calling val().
538                         @Potential_commit_point_define: true
539                         @Label: Read_Val_Point
540                         @End
541                 */
542                 return res;
543
544
545         }
546
547         static int hash(slot *key_slot) {
548                 MODEL_ASSERT(key_slot != NULL && key_slot->_ptr != NULL);
549                 TypeK* key = (TypeK*) key_slot->_ptr;
550                 int h = key->hashCode();
551                 // Spread bits according to Cliff Click's code
552                 h += (h << 15) ^ 0xffffcd7d;
553                 h ^= (h >> 10);
554                 h += (h << 3);
555                 h ^= (h >> 6);
556                 h += (h << 2) + (h << 14);
557                 return h ^ (h >> 16);
558         }
559         
560         // Heuristic to decide if reprobed too many times. 
561         // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
562         // put it triggers a table resize. Several places MUST have exact agreement.
563         static int reprobe_limit(int len) {
564                 return REPROBE_LIMIT + (len >> 2);
565         }
566         
567         static inline bool is_prime(slot *val) {
568                 return (val != NULL) && val->_prime;
569         }
570
571         // Check for key equality. Try direct pointer comparison first (fast
572         // negative teset) and then the full 'equals' call
573         static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
574                 int fullhash) {
575                 // Caller should've checked this.
576                 MODEL_ASSERT (K != NULL);
577                 TypeK* key_ptr = (TypeK*) key_slot->_ptr;
578                 return
579                         K == key_slot ||
580                                 ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
581                                 K != TOMBSTONE &&
582                                 key_ptr->equals(K->_ptr));
583         }
584
585         static bool valeq(slot *val_slot1, slot *val_slot2) {
586                 MODEL_ASSERT (val_slot1 != NULL);
587                 TypeK* ptr1 = (TypeV*) val_slot1->_ptr;
588                 if (val_slot2 == NULL || ptr1 == NULL) return false;
589                 return ptr1->equals(val_slot2->_ptr);
590         }
591         
592         // Together with key() preserve the happens-before relationship on newly
593         // inserted keys
594         static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
595                 return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
596                         desired, memory_order_release, memory_order_release);
597         }
598
599         /**
600                 Same as the val() function, we only label the CAS operation as the
601                 potential commit point.
602         */
603         // Together with val() preserve the happens-before relationship on newly
604         // inserted values
605         static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
606                 *desired) {
607                 bool res =  kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
608                         desired, memory_order_release, memory_order_release);
609                 /**
610                         # If it is a successful put instead of a copy or any other internal
611                         # operantions, expected != NULL
612                         @Begin
613                         @Potential_commit_point_define: __ATOMIC_RET__ == true
614                         @Label: Write_Val_Point
615                         @End
616                 */
617                 return res;
618         }
619
620         slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
621                 fullhash) {
622                 int len = kvs->_size;
623                 CHM *chm = get_chm(kvs);
624                 int *hashes = get_hashes(kvs);
625
626                 int idx = fullhash & (len - 1);
627                 int reprobe_cnt = 0;
628                 while (true) {
629                         slot *K = key(kvs, idx);
630                         slot *V = val(kvs, idx);
631                         /**
632                                 @Begin
633                                 @Commit_point_define: V == NULL
634                                 @Potential_commit_point_label: Read_Val_Point
635                                 @Label: Get_Success_Point_1
636                                 @End
637                         */
638
639                         if (K == NULL) return NULL; // A miss
640                         
641                         if (keyeq(K, key_slot, hashes, idx, fullhash)) {
642                                 // Key hit! Check if table-resize in progress
643                                 if (!is_prime(V)) {
644                                         /**
645                                                 @Begin
646                                                 @Commit_point_define: true
647                                                 @Potential_commit_point_label: Read_Val_Point
648                                                 @Label: Get_Success_Point_2
649                                                 @End
650                                         */
651                                         return (V == TOMBSTONE) ? NULL : V; // Return this value
652                                 }
653                                 // Otherwise, finish the copy & retry in the new table
654                                 return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
655                                         idx, key_slot), key_slot, fullhash);
656                         }
657
658                         if (++reprobe_cnt >= REPROBE_LIMIT ||
659                                 key_slot == TOMBSTONE) {
660                                 // Retry in new table
661                                 // Atomic read (acquire) can be here 
662                                 kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
663                                 /**
664                                         @Begin
665                                         @Commit_point_define_check: newkvs == NULL
666                                         @Label: Get_Success_Point_3
667                                         @End
668                                 */
669                                 return newkvs == NULL ? NULL : get_impl(topmap,
670                                         topmap->help_copy(newkvs), key_slot, fullhash);
671                         }
672
673                         idx = (idx + 1) & (len - 1); // Reprobe by 1
674                 }
675         }
676
677         // A wrapper of the essential function putIfMatch()
678         TypeV* putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
679                 // TODO: Should throw an exception rather return NULL
680                 if (old_val == NULL) {
681                         return NULL;
682                 }
683                 void *key_ptr = (void*) new TypeK(key);
684                 slot *key_slot = new slot(false, key_ptr);
685
686                 void *val_ptr = (void*) new TypeV(value);
687                 slot *value_slot = new slot(false, val_ptr);
688                 kvs_data *kvs = _kvs.load(memory_order_acquire);
689                 slot *res = putIfMatch(this, kvs, key_slot, value_slot, old_val);
690                 // Only when copy_slot() call putIfMatch() will it return NULL
691                 MODEL_ASSERT (res != NULL); 
692                 MODEL_ASSERT (!is_prime(res));
693                 return res == TOMBSTONE ? NULL : (TypeV*) res->_ptr;
694         }
695
696         /**
697                 Put, Remove, PutIfAbsent, etc will call this function. Return the old
698                 value. If the returned value is equals to the expVal (or expVal is
699                 NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
700                 Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
701                 NULL if passed a NULL expVal.
702         */
703         static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
704                 *key_slot, slot *val_slot, slot *expVal) {
705                 MODEL_ASSERT (val_slot != NULL);
706                 MODEL_ASSERT (!is_prime(val_slot));
707                 MODEL_ASSERT (!is_prime(expVal));
708
709                 int fullhash = hash(key_slot);
710                 int len = kvs->_size;
711                 CHM *chm = get_chm(kvs);
712                 int *hashes = get_hashes(kvs);
713                 int idx = fullhash & (len - 1);
714
715                 // Claim a key slot
716                 int reprobe_cnt = 0;
717                 slot *K;
718                 slot *V;
719                 kvs_data *newkvs;
720                 
721                 while (true) { // Spin till we get a key slot
722                         K = key(kvs, idx);
723                         V = val(kvs, idx);
724                         if (K == NULL) { // Get a free slot
725                                 if (val_slot == TOMBSTONE) return val_slot;
726                                 // Claim the null key-slot
727                                 if (CAS_key(kvs, idx, NULL, key_slot)) {
728                                         chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
729                                         hashes[idx] = fullhash; // Memorize full hash
730                                         break;
731                                 }
732                                 K = key(kvs, idx); // CAS failed, get updated value
733                                 MODEL_ASSERT (K != NULL);
734                         }
735
736                         // Key slot not null, there exists a Key here
737                         if (keyeq(K, key_slot, hashes, idx, fullhash))
738                                 break; // Got it
739                         
740                         // Notice that the logic here should be consistent with that of get.
741                         // The first predicate means too many reprobes means nothing in the
742                         // old table.
743                         if (++reprobe_cnt >= reprobe_limit(len) ||
744                                 K == TOMBSTONE) { // Found a Tombstone key, no more keys
745                                 newkvs = chm->resize(topmap, kvs);
746                                 // Help along an existing copy
747                                 if (expVal != NULL) topmap->help_copy(newkvs);
748                                 return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
749                         }
750
751                         idx = (idx + 1) & (len - 1); // Reprobe
752                 } // End of spinning till we get a Key slot
753
754                 if (val_slot == V) return V; // Fast cutout for no-change
755         
756                 // Here it tries to resize cause it doesn't want other threads to stop
757                 // its progress (eagerly try to resize soon)
758                 newkvs = chm->_newkvs.load(memory_order_acquire);
759                 if (newkvs == NULL &&
760                         ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
761                         newkvs = chm->resize(topmap, kvs); // Force the copy to start
762                 
763                 // Finish the copy and then put it in the new table
764                 if (newkvs != NULL)
765                         return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
766                                 expVal), key_slot, val_slot, expVal);
767                 
768                 // Decided to update the existing table
769                 while (true) {
770                         MODEL_ASSERT (!is_prime(V));
771
772                         if (expVal != NO_MATCH_OLD &&
773                                 V != expVal &&
774                                 (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
775                                 !(V == NULL && expVal == TOMBSTONE) &&
776                                 (expVal == NULL || !valeq(expVal, V))) {
777                                 /**
778                                         @Begin
779                                         @Commit_point_define: expVal == TOMBSTONE
780                                         @Potential_commit_point_label: Read_Val_Point
781                                         @Label: PutIfAbsent_Fail_Point
782                                                 # This is a check for the PutIfAbsent() when the value
783                                                 # is not absent
784                                         @End
785                                 */
786                                 /**
787                                         @Begin
788                                         @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
789                                         @Potential_commit_point_label: Read_Val_Point
790                                         @Label: RemoveIfMatch_Fail_Point
791                                         @End
792                                 */
793                                 /**
794                                         @Begin
795                                         @Commit_point_define: !valeq(expVal, V)
796                                         @Potential_commit_point_label: Read_Val_Point
797                                         @Label: ReplaceIfMatch_Fail_Point
798                                         @End
799                                 */
800                                 return V; // Do not update!
801                         }
802
803                         if (CAS_val(kvs, idx, V, val_slot)) {
804                                 /**
805                                         @Begin
806                                         # The only point where a successful put happens
807                                         @Commit_point_define: true
808                                         @Potential_commit_point_label: Write_Val_Point
809                                         @Label: Write_Success_Point
810                                         @End
811                                 */
812                                 if (expVal != NULL) { // Not called by a table-copy
813                                         // CAS succeeded, should adjust size
814                                         // Both normal put's and table-copy calls putIfMatch, but
815                                         // table-copy does not increase the number of live K/V pairs
816                                         if ((V == NULL || V == TOMBSTONE) &&
817                                                 val_slot != TOMBSTONE)
818                                                 chm->_size.fetch_add(1, memory_order_relaxed);
819                                         if (!(V == NULL || V == TOMBSTONE) &&
820                                                 val_slot == TOMBSTONE)
821                                                 chm->_size.fetch_add(-1, memory_order_relaxed);
822                                 }
823                                 return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
824                         }
825                         // Else CAS failed
826                         V = val(kvs, idx);
827                         if (is_prime(V))
828                                 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
829                                         idx, expVal), key_slot, val_slot, expVal);
830                 }
831         }
832
833         // Help along an existing table-resize. This is a fast cut-out wrapper.
834         kvs_data* help_copy(kvs_data *helper) {
835                 kvs_data *topkvs = _kvs.load(memory_order_acquire);
836                 CHM *topchm = get_chm(topkvs);
837                 // No cpy in progress
838                 if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
839                 topchm->help_copy_impl(this, topkvs, false);
840                 return helper;
841         }
842 };
843
844 #endif