add mpmc
[cdsspec-compiler.git] / benchmark / cliffc-hashtable / cliffc_hashtable.h
1 #ifndef CLIFFC_HASHTABLE_H
2 #define CLIFFC_HASHTABLE_H
3
4 #include <iostream>
5 #include <atomic>
6 #include <memory>
7 #include <assert.h>
8
9 using namespace std;
10
11
12
13 /**
14         This header file declares and defines a simplified version of Cliff Click's
15         NonblockingHashMap. It contains all the necessary structrues and main
16         functions. In simplified_cliffc_hashtable.cc file, it has the definition for
17         the static fields.
18 */
19
20 template<typename TypeK, typename TypeV>
21 class cliffc_hashtable;
22
23 /**
24         Corresponding the the Object[] array in Cliff Click's Java implementation.
25         It keeps the first two slots for CHM (Hashtable control unit) and the hash
26         records (an array of hash used for fast negative key-equality check).
27 */
28 struct kvs_data {
29         int _size;
30         atomic<void*> *_data;
31         
32         kvs_data(int sz) {
33                 _size = sz;
34                 int real_size = sizeof(atomic<void*>) * 2 + 2;
35                 _data = new atomic<void*>[real_size];
36                 // The control block should be initialized in resize()
37                 // Init the hash record array
38                 int *hashes = new int[_size];
39                 int i;
40                 for (i = 0; i < _size; i++) {
41                         hashes[i] = 0;
42                 }
43                 _data[1].store(hashes, memory_order_relaxed);
44                 // Init the data to Null slot
45                 for (i = 2; i < real_size; i++) {
46                         _data[i].store(NULL, memory_order_relaxed);
47                 }
48         }
49
50         ~kvs_data() {
51                 int *hashes = (int*) _data[1].load(memory_order_relaxed);
52                 delete hashes;
53                 delete[] _data;
54         }
55 };
56
57 struct slot {
58         bool _prime;
59         shared_ptr<void> _ptr;
60
61         slot(bool prime, shared_ptr<void> ptr) {
62                 _prime = prime;
63                 _ptr = ptr;
64         }
65
66 };
67
68
69 /**
70         TypeK must have defined function "int hashCode()" which return the hash
71         code for the its object, and "int equals(TypeK anotherKey)" which is
72         used to judge equality.
73         TypeK and TypeV should define their own copy constructor.
74         To make the memory management safe and similar to Cliff Click's Java
75         implementation, we use shared_ptr instead of normal pointer in terms of the
76         pointers that point to TypeK and TypeV.
77 */
78 template<typename TypeK, typename TypeV>
79 class cliffc_hashtable {
80         /**
81                 # The synchronization we have for the hashtable gives us the property of
82                 # serializability, so we should have a sequential hashtable when we check the
83                 # correctness. The key thing is to identify all the commit point.
84         
85                 @Begin
86                 @Options:
87                         LANG = C;
88                         CLASS = cliffc_hashtable;
89                 @Global_define:
90                         @DeclareVar:
91                         spec_hashtable<TypeK, TypeV*> map;
92                         spec_hashtable<TypeK, Tag> id_map;
93                         Tag tag;
94                         @InitVar:
95                                 map = spec_hashtable<TypeK, TypeV*>();
96                                 id_map = spec_hashtable<TypeK, TypeV*>();
97                                 tag = Tag();
98                         @DefineFunc:
99                         bool equals_val(TypeV *ptr1, TypeV *ptr2) {
100                                 // ...
101                         }
102                         
103                         @DefineFunc:
104                         # Update the tag for the current key slot if the corresponding tag
105                         # is NULL, otherwise just return that tag. It will update the next
106                         # available tag too if it requires a new tag for that key slot.
107                         Tag getKeyTag(TypeK &key) {
108                                 if (id_map.get(key) == NULL) {
109                                         Tag cur_tag = tag.current();
110                                         id_map.put(key, cur_tag);
111                                         tag.next();
112                                         return cur_tag;
113                                 } else {
114                                         return id_map.get(key);
115                                 }
116                         }
117                 
118                 @Interface_cluster:
119                         Read_interface = {
120                                 Get,
121                                 PutIfAbsent,
122                                 RemoveAny,
123                                 RemoveIfMatch,
124                                 ReplaceAny,
125                                 ReplaceIfMatch
126                         }
127                         
128                         Write_interface = {
129                                 Put,
130                                 PutIfAbsent(COND_PutIfAbsentSucc),
131                                 RemoveAny,
132                                 RemoveIfMatch(COND_RemoveIfMatchSucc),
133                                 ReplaceAny,
134                                 ReplaceIfMatch(COND_ReplaceIfMatchSucc)
135                         }
136                 @Happens_before:
137                         Write_interface -> Read_interface
138                 @End
139         */
140
141 friend class CHM;
142         /**
143                 The control structure for the hashtable
144         */
145         private:
146         class CHM {
147                 friend class cliffc_hashtable;
148                 private:
149                 atomic<kvs_data*> _newkvs;
150                 
151                 // Size of active K,V pairs
152                 atomic_int _size;
153         
154                 // Count of used slots
155                 atomic_int _slots;
156                 
157                 // The next part of the table to copy
158                 atomic_int _copy_idx;
159                 
160                 // Work-done reporting
161                 atomic_int _copy_done;
162         
163                 public:
164                 CHM(int size) {
165                         _size.store(size, memory_order_relaxed);
166                         _slots.store(0, memory_order_relaxed);
167         
168                         _copy_idx.store(0, memory_order_relaxed);
169                         _copy_done.store(0, memory_order_release);
170                 }
171         
172                 ~CHM() {}
173                 
174                 private:
175                         
176                 // Heuristic to decide if the table is too full
177                 bool table_full(int reprobe_cnt, int len) {
178                         return
179                                 reprobe_cnt >= REPROBE_LIMIT &&
180                                 _slots.load(memory_order_relaxed) >= reprobe_limit(len);
181                 }
182         
183                 kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
184                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
185                         if (newkvs != NULL)
186                                 return newkvs;
187         
188                         // No copy in-progress, start one; Only double the table size
189                         int oldlen = kvs->_size;
190                         int sz = _size.load(memory_order_relaxed);
191                         int newsz = sz;
192                         
193                         // Just follow Cliff Click's heuristic to decide the new size
194                         if (sz >= (oldlen >> 2)) { // If we are 25% full
195                                 newsz = oldlen << 1; // Double size
196                                 if (sz >= (oldlen >> 1))
197                                         newsz = oldlen << 2; // Double double size
198                         }
199         
200                         // We do not record the record timestamp
201                         if (newsz <= oldlen) newsz = oldlen << 1;
202                         // Do not shrink ever
203                         if (newsz < oldlen) newsz = oldlen;
204         
205                         // Last check cause the 'new' below is expensive
206                         newkvs = _newkvs.load(memory_order_acquire);
207                         if (newkvs != NULL) return newkvs;
208         
209                         newkvs = new kvs_data(newsz);
210                         void *chm = (void*) new CHM(sz);
211                         newkvs->_data[0].store(chm, memory_order_relaxed);
212         
213                         kvs_data *cur_newkvs; 
214                         // Another check after the slow allocation
215                         if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
216                                 return cur_newkvs;
217                         // CAS the _newkvs to the allocated table
218                         kvs_data *desired = (kvs_data*) NULL;
219                         kvs_data *expected = (kvs_data*) newkvs; 
220                         if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
221                                         memory_order_release)) {
222                                 // Should clean the allocated area
223                                 delete newkvs;
224                                 newkvs = _newkvs.load(memory_order_acquire);
225                         }
226                         return newkvs;
227                 }
228         
229                 void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
230                         bool copy_all) {
231                         assert (get_chm(oldkvs) == this);
232                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
233                         int oldlen = oldkvs->_size;
234                         int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
235                 
236                         // Just follow Cliff Click's code here
237                         int panic_start = -1;
238                         int copyidx;
239                         while (_copy_done.load(memory_order_acquire) < oldlen) {
240                                 copyidx = _copy_idx.load(memory_order_acquire);
241                                 if (panic_start == -1) { // No painc
242                                         copyidx = _copy_idx.load(memory_order_acquire);
243                                         while (copyidx < (oldlen << 1) &&
244                                                 !_copy_idx.compare_exchange_strong(copyidx, copyidx +
245                                                         min_copy_work, memory_order_release, memory_order_release))
246                                                 copyidx = _copy_idx.load(memory_order_relaxed);
247                                         if (!(copyidx < (oldlen << 1)))
248                                                 panic_start = copyidx;
249                                 }
250         
251                                 // Now copy the chunk of work we claimed
252                                 int workdone = 0;
253                                 for (int i = 0; i < min_copy_work; i++)
254                                         if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
255                                                 newkvs))
256                                                 workdone++;
257                                 if (workdone > 0)
258                                         copy_check_and_promote(topmap, oldkvs, workdone);
259         
260                                 copyidx += min_copy_work;
261                                 if (!copy_all && panic_start == -1)
262                                         return; // We are done with the work we claim
263                         }
264                         copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
265                 }
266         
267                 kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
268                         *oldkvs, int idx, void *should_help) {
269                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
270                         // We're only here cause the caller saw a Prime
271                         if (copy_slot(topmap, idx, oldkvs, _newkvs))
272                                 copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
273                         return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
274                 }
275         
276                 void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
277                         oldkvs, int workdone) {
278                         int oldlen = oldkvs->_size;
279                         int copyDone = _copy_done.load(memory_order_relaxed);
280                         if (workdone > 0) {
281                                 while (true) {
282                                         copyDone = _copy_done.load(memory_order_relaxed);
283                                         if (_copy_done.compare_exchange_weak(copyDone, copyDone +
284                                                 workdone, memory_order_relaxed, memory_order_relaxed))
285                                                 break;
286                                 }
287                         }
288         
289                         // Promote the new table to the current table
290                         if (copyDone + workdone == oldlen &&
291                                 topmap->_kvs.load(memory_order_acquire) == oldkvs)
292                                 topmap->_kvs.compare_exchange_strong(oldkvs, _newkvs, memory_order_release,
293                                         memory_order_release);
294                 }
295         
296                 bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
297                         kvs_data *newkvs) {
298                         slot *key_slot;
299                         while ((key_slot = key(oldkvs, idx)) == NULL)
300                                 CAS_key(oldkvs, idx, NULL, TOMBSTONE);
301         
302                         // First CAS old to Prime
303                         slot *oldval = val(oldkvs, idx);
304                         while (!is_prime(oldval)) {
305                                 slot *box = (oldval == NULL || oldval == TOMBSTONE)
306                                         ? TOMBPRIME : new slot(true, oldval->_ptr);
307                                 if (CAS_val(oldkvs, idx, oldval, box)) {
308                                         if (box == TOMBPRIME)
309                                                 return 1; // Copy done
310                                         // Otherwise we CAS'd the box
311                                         oldval = box; // Record updated oldval
312                                         break;
313                                 }
314                                 oldval = val(oldkvs, idx); // Else re-try
315                         }
316         
317                         if (oldval == TOMBPRIME) return false; // Copy already completed here
318         
319                         slot *old_unboxed = new slot(false, oldval->_ptr);
320                         int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
321                                 NULL) == NULL);
322         
323                         // Old value is exposed in the new table
324                         while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
325                                 oldval = val(oldkvs, idx);
326         
327                         return copied_into_new;
328                 }
329         };
330
331         
332
333         private:
334         static const int Default_Init_Size = 8; // Intial table size
335
336         static slot* const MATCH_ANY;
337         static slot* const NO_MATCH_OLD;
338
339         static slot* const TOMBPRIME;
340         static slot* const TOMBSTONE;
341
342         static const int REPROBE_LIMIT = 10; // Forces a table-resize
343
344         atomic<kvs_data*> _kvs;
345
346         public:
347         cliffc_hashtable() {
348                 // Should initialize the CHM for the construction of the table
349                 // For other CHM in kvs_data, they should be initialzed in resize()
350                 // because the size is determined dynamically
351                 kvs_data *kvs = new kvs_data(Default_Init_Size);
352                 void *chm = (void*) new CHM(0);
353                 kvs->_data[0].store(chm, memory_order_relaxed);
354                 _kvs.store(kvs, memory_order_release);
355         }
356
357         cliffc_hashtable(int init_size) {
358                 // Should initialize the CHM for the construction of the table
359                 // For other CHM in kvs_data, they should be initialzed in resize()
360                 // because the size is determined dynamically
361                 kvs_data *kvs = new kvs_data(init_size);
362                 void *chm = (void*) new CHM(0);
363                 kvs->_data[0].store(chm, memory_order_relaxed);
364                 _kvs.store(kvs, memory_order_release);
365         }
366
367         /**
368                 @Begin
369                 @Interface: Get
370                 @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
371                 @ID: __sequential.getKeyTag(key)
372                 @Action:
373                         TypeV *_Old_Val = __sequential.map.get(key)
374                 @Post_check:
375                         __sequential.equals_val(_Old_Val, __RET__)
376                 @End
377         */
378         shared_ptr<TypeV> get(TypeK& key) {
379                 void *key_ptr = (void*) new TypeK(key);
380                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
381                 int fullhash = hash(key_slot);
382                 slot *V = get_impl(this, _kvs, key_slot, fullhash);
383                 if (V == NULL) return NULL;
384                 assert (!is_prime(V));
385                 return static_pointer_cast<TypeV>(V->_ptr);
386         }
387
388         /**
389                 @Begin
390                 @Interface: Put
391                 @Commit_point_set: Write_Val_Point
392                 @ID: __sequential.getKeyTag(key)
393                 @Action:
394                         # Remember this old value at checking point
395                         TypeV *_Old_Val = __sequential.map.get(key)
396                         __sequential.map.put(key, &val);
397                 @Post_check:
398                         __sequential.equals_val(__RET__, _Old_Val)
399                 @End
400         */
401         shared_ptr<TypeV> put(TypeK& key, TypeV& val) {
402                 return putIfMatch(key, val, NO_MATCH_OLD);
403         }
404
405         /**
406                 @Begin
407                 @Interface: PutIfAbsent
408                 @Commit_point_set:
409                         Write_Val_Point | PutIfAbsent_Fail_Point
410                 @Condition: __sequential.map.get(key) == NULL
411                 @HB_condition:
412                         COND_PutIfAbsentSucc :: __RET__ == NULL
413                 @ID: __sequential.getKeyTag(key)
414                 @Action:
415                         TypeV *_Old_Val = __sequential.map.get(key)
416                         if (__COND_SAT__)
417                                 __sequential.map.put(key, &value);
418                 @Post_check:
419                         __COND_SAT__ ? __RET__ == NULL : __sequential.equals_val(_Old_Val, __RET__) 
420                 @End
421         */
422         shared_ptr<TypeV> putIfAbsent(TypeK& key, TypeV& value) {
423                 return putIfMatch(key, val, TOMBSTONE);
424         }
425
426         /**
427                 @Begin
428                 @Interface: RemoveAny
429                 @Commit_point_set: Write_Val_Point
430                 @ID: __sequential.getKeyTag(key)
431                 @Action:
432                         TypeV *_Old_Val = __sequential.map.get(key)
433                         __sequential.map.put(key, NULL);
434                 @Post_check:
435                         __sequential.equals_val(__RET__, _Old_Val)
436                 @End
437         */
438         shared_ptr<TypeV> remove(TypeK& key) {
439                 return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
440         }
441
442         /**
443                 @Begin
444                 @Interface: RemoveIfMatch
445                 @Commit_point_set:
446                         Write_Val_Point | RemoveIfMatch_Fail_Point
447                 @Condition:
448                         __sequential.equals_val(__sequential.map.get(key), &val)
449                 @HB_condition:
450                         COND_RemoveIfMatchSucc :: __RET__ == true
451                 @ID: __sequential.getKeyTag(key)
452                 @Action:
453                         if (__COND_SAT__)
454                                 __sequential.map.put(key, NULL);
455                 @Post_check:
456                         __COND_SAT__ ? __RET__ : !__RET__
457                 @End
458         */
459         bool remove(TypeK& key, TypeV& val) {
460                 slot *val_slot = val == NULL ? NULL : new slot(false, val);
461                 return putIfMatch(key, TOMBSTONE, val) == val;
462
463         }
464
465         /**
466                 @Begin
467                 @Interface: ReplaceAny
468                 @Commit_point_set:
469                         Write_Val_Point
470                 @ID: __sequential.getKeyTag(key)
471                 @Action:
472                         TypeV *_Old_Val = __sequential.map.get(key)
473                 @Post_check:
474                         __sequential.equals_val(__RET__, _Old_Val)
475                 @End
476         */
477         shared_ptr<TypeV> replace(TypeK& key, TypeV& val) {
478                 return putIfMatch(key, val, MATCH_ANY);
479         }
480
481         /**
482                 @Begin
483                 @Interface: ReplaceIfMatch
484                 @Commit_point_set:
485                         Write_Val_Point | ReplaceIfMatch_Fail_Point
486                 @Condition:
487                         __sequential.equals_val(__sequential.map.get(key), &oldval)
488                 @HB_condition:
489                         COND_ReplaceIfMatchSucc :: __RET__ == true
490                 @ID: __sequential.getKeyTag(key)
491                 @Action:
492                         if (__COND_SAT__)
493                                 __sequential.map.put(key, &newval);
494                 @Post_check:
495                         __COND_SAT__ ? __RET__ : !__RET__
496                 @End
497         */
498         bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
499                 return putIfMatch(key, newval, oldval) == oldval;
500         }
501
502         private:
503         static CHM* get_chm(kvs_data* kvs) {
504                 return (CHM*) kvs->_data[0].load(memory_order_relaxed);
505         }
506
507         static int* get_hashes(kvs_data *kvs) {
508                 return (int *) kvs->_data[1].load(memory_order_relaxed);
509         }
510         
511         // Preserve happens-before semantics on newly inserted keys
512         static inline slot* key(kvs_data *kvs, int idx) {
513                 assert (idx >= 0 && idx < kvs->_size);
514                 // Corresponding to the volatile read in get_impl() and putIfMatch in
515                 // Cliff Click's Java implementation
516                 return (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
517         }
518
519         /**
520                 The atomic operation in val() function is a "potential" commit point,
521                 which means in some case it is a real commit point while it is not for
522                 some other cases. This so happens because the val() function is such a
523                 fundamental function that many internal operation will call. Our
524                 strategy is that we label any potential commit points and check if they
525                 really are the commit points later.
526         */
527         // Preserve happens-before semantics on newly inserted values
528         static inline slot* val(kvs_data *kvs, int idx) {
529                 assert (idx >= 0 && idx < kvs->_size);
530                 // Corresponding to the volatile read in get_impl() and putIfMatch in
531                 // Cliff Click's Java implementation
532                 slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
533                 /**
534                         @Begin
535                         # This is a complicated potential commit point since many many functions are
536                         # calling val().
537                         @Potential_commit_point_define: true
538                         @Label: Read_Val_Point
539                         @End
540                 */
541                 return res;
542
543
544         }
545
546         static int hash(slot *key_slot) {
547                 assert(key_slot != NULL && key_slot->_ptr != NULL);
548                 shared_ptr<TypeK> key = static_pointer_cast<TypeK>(key_slot->_ptr);
549                 int h = key->hashCode();
550                 // Spread bits according to Cliff Click's code
551                 h += (h << 15) ^ 0xffffcd7d;
552                 h ^= (h >> 10);
553                 h += (h << 3);
554                 h ^= (h >> 6);
555                 h += (h << 2) + (h << 14);
556                 return h ^ (h >> 16);
557         }
558         
559         // Heuristic to decide if reprobed too many times. 
560         // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
561         // put it triggers a table resize. Several places MUST have exact agreement.
562         static int reprobe_limit(int len) {
563                 return REPROBE_LIMIT + (len >> 2);
564         }
565         
566         static inline bool is_prime(slot *val) {
567                 return (val != NULL) && val->_prime;
568         }
569
570         // Check for key equality. Try direct pointer comparison first (fast
571         // negative teset) and then the full 'equals' call
572         static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
573                 int fullhash) {
574                 // Caller should've checked this.
575                 assert (K != NULL);
576                 shared_ptr<TypeK> key_ptr = static_pointer_cast<TypeK>(key_slot->_ptr);
577                 return
578                         K == key_slot ||
579                                 ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
580                                 K != TOMBSTONE &&
581                                 key_ptr->equals(K->_ptr));
582         }
583
584         static bool valeq(slot *val_slot1, slot *val_slot2) {
585                 assert (val_slot1 != NULL);
586                 shared_ptr<TypeK> ptr1 = static_pointer_cast<TypeV>(val_slot1->_ptr);
587                 if (val_slot2 == NULL || ptr1 == NULL) return false;
588                 return ptr1->equals(val_slot2->_ptr);
589         }
590         
591         // Together with key() preserve the happens-before relationship on newly
592         // inserted keys
593         static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
594                 return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
595                         desired, memory_order_release, memory_order_release);
596         }
597
598         /**
599                 Same as the val() function, we only label the CAS operation as the
600                 potential commit point.
601         */
602         // Together with val() preserve the happens-before relationship on newly
603         // inserted values
604         static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
605                 *desired) {
606                 bool res =  kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
607                         desired, memory_order_release, memory_order_release);
608                 /**
609                         # If it is a successful put instead of a copy or any other internal
610                         # operantions, expected != NULL
611                         @Begin
612                         @Potential_commit_point_define: __ATOMIC_RET__ == true
613                         @Label: Write_Val_Point
614                         @End
615                 */
616                 return res;
617         }
618
619         slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
620                 fullhash) {
621                 int len = kvs->_size;
622                 CHM *chm = get_chm(kvs);
623                 int *hashes = get_hashes(kvs);
624
625                 int idx = fullhash & (len - 1);
626                 int reprobe_cnt = 0;
627                 while (true) {
628                         slot *K = key(kvs, idx);
629                         slot *V = val(kvs, idx);
630                         /**
631                                 @Begin
632                                 @Commit_point_define: V == NULL
633                                 @Potential_commit_point_label: Read_Val_Point
634                                 @Label: Get_Success_Point_1
635                                 @End
636                         */
637
638                         if (V == NULL) return NULL; // A miss
639                         
640                         if (keyeq(K, key_slot, hashes, idx, fullhash)) {
641                                 // Key hit! Check if table-resize in progress
642                                 if (!is_prime(V)) {
643                                         /**
644                                                 @Begin
645                                                 @Commit_point_define: true
646                                                 @Potential_commit_point_label: Read_Val_Point
647                                                 @Label: Get_Success_Point_2
648                                                 @End
649                                         */
650                                         return (V == TOMBSTONE) ? NULL : V; // Return this value
651                                 }
652                                 // Otherwise, finish the copy & retry in the new table
653                                 return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
654                                         idx, key_slot), key_slot, fullhash);
655                         }
656
657                         if (++reprobe_cnt >= REPROBE_LIMIT ||
658                                 key_slot == TOMBSTONE) {
659                                 // Retry in new table
660                                 // Atomic read (acquire) can be here 
661                                 kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
662                                 /**
663                                         @Begin
664                                         @Commit_point_define_check: newkvs == NULL
665                                         @Label: Get_Success_Point_3
666                                         @End
667                                 */
668                                 return newkvs == NULL ? NULL : get_impl(topmap,
669                                         topmap->help_copy(newkvs), key_slot, fullhash);
670                         }
671
672                         idx = (idx + 1) & (len - 1); // Reprobe by 1
673                 }
674         }
675
676         // A wrapper of the essential function putIfMatch()
677         shared_ptr<TypeV> putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
678                 // TODO: Should throw an exception rather return NULL
679                 if (old_val == NULL) {
680                         return NULL;
681                 }
682                 void *key_ptr = (void*) new TypeK(key);
683                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
684
685                 void *val_ptr = (void*) new TypeV(value);
686                 slot *value_slot = new slot(false, shared_ptr<void>(val_ptr));
687                 slot *res = putIfMatch(this, _kvs, key_slot, value_slot, old_val);
688                 // Only when copy_slot() call putIfMatch() will it return NULL
689                 assert (res != NULL); 
690                 assert (!is_prime(res));
691                 return res == TOMBSTONE ? NULL : static_pointer_cast<TypeV>(res->_ptr);
692         }
693
694         /**
695                 Put, Remove, PutIfAbsent, etc will call this function. Return the old
696                 value. If the returned value is equals to the expVal (or expVal is
697                 NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
698                 Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
699                 NULL if passed a NULL expVal.
700         */
701         static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
702                 *key_slot, slot *val_slot, slot *expVal) {
703                 assert (val_slot != NULL);
704                 assert (!is_prime(val_slot));
705                 assert (!is_prime(expVal));
706
707                 int fullhash = hash(key_slot);
708                 int len = kvs->_size;
709                 CHM *chm = get_chm(kvs);
710                 int *hashes = get_hashes(kvs);
711                 int idx = fullhash & (len - 1);
712
713                 // Claim a key slot
714                 int reprobe_cnt = 0;
715                 slot *K;
716                 slot *V;
717                 kvs_data *newkvs;
718                 
719                 while (true) { // Spin till we get a key slot
720                         K = key(kvs, idx);
721                         V = val(kvs, idx);
722                         if (K == NULL) { // Get a free slot
723                                 if (val_slot == TOMBSTONE) return val_slot;
724                                 // Claim the null key-slot
725                                 if (CAS_key(kvs, idx, NULL, key_slot)) {
726                                         chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
727                                         hashes[idx] = fullhash; // Memorize full hash
728                                         break;
729                                 }
730                                 K = key(kvs, idx); // CAS failed, get updated value
731                                 assert (K != NULL);
732                         }
733
734                         // Key slot not null, there exists a Key here
735                         if (keyeq(K, key_slot, hashes, idx, fullhash))
736                                 break; // Got it
737                         
738                         // Notice that the logic here should be consistent with that of get.
739                         // The first predicate means too many reprobes means nothing in the
740                         // old table.
741                         if (++reprobe_cnt >= reprobe_limit(len) ||
742                                 K == TOMBSTONE) { // Found a Tombstone key, no more keys
743                                 newkvs = chm->resize(topmap, kvs);
744                                 // Help along an existing copy
745                                 if (expVal != NULL) topmap->help_copy(newkvs);
746                                 return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
747                         }
748
749                         idx = (idx + 1) & (len - 1); // Reprobe
750                 } // End of spinning till we get a Key slot
751
752                 if (val_slot == V) return V; // Fast cutout for no-change
753         
754                 // Here it tries to resize cause it doesn't want other threads to stop
755                 // its progress (eagerly try to resize soon)
756                 newkvs = chm->_newkvs.load(memory_order_acquire);
757                 if (newkvs == NULL &&
758                         ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
759                         newkvs = chm->resize(topmap, kvs); // Force the copy to start
760                 
761                 // Finish the copy and then put it in the new table
762                 if (newkvs != NULL)
763                         return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
764                                 expVal), key_slot, val_slot, expVal);
765                 
766                 // Decided to update the existing table
767                 while (true) {
768                         assert (!is_prime(V));
769
770                         if (expVal != NO_MATCH_OLD &&
771                                 V != expVal &&
772                                 (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
773                                 !(V == NULL && expVal == TOMBSTONE) &&
774                                 (expVal == NULL || !valeq(expVal, V))) {
775                                 /**
776                                         @Begin
777                                         @Commit_point_define: expVal == TOMBSTONE
778                                         @Potential_commit_point_label: Read_Val_Point
779                                         @Label: PutIfAbsent_Fail_Point
780                                                 # This is a check for the PutIfAbsent() when the value
781                                                 # is not absent
782                                         @End
783                                 */
784                                 /**
785                                         @Begin
786                                         @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
787                                         @Potential_commit_point_label: Read_Val_Point
788                                         @Label: RemoveIfMatch_Fail_Point
789                                         @End
790                                 */
791                                 /**
792                                         @Begin
793                                         @Commit_point_define: !valeq(expVal, V)
794                                         @Potential_commit_point_label: Read_Val_Point
795                                         @Label: ReplaceIfMatch_Fail_Point
796                                         @End
797                                 */
798                                 return V; // Do not update!
799                         }
800
801                         if (CAS_val(kvs, idx, V, val_slot)) {
802                                 /**
803                                         @Begin
804                                         # The only point where a successful put happens
805                                         @Commit_point_define: true
806                                         @Potential_commit_point_label: Write_Val_Point
807                                         @Label: Write_Success_Point
808                                         @End
809                                 */
810                                 if (expVal != NULL) { // Not called by a table-copy
811                                         // CAS succeeded, should adjust size
812                                         // Both normal put's and table-copy calls putIfMatch, but
813                                         // table-copy does not increase the number of live K/V pairs
814                                         if ((V == NULL || V == TOMBSTONE) &&
815                                                 val_slot != TOMBSTONE)
816                                                 chm->_size.fetch_add(1, memory_order_relaxed);
817                                         if (!(V == NULL || V == TOMBSTONE) &&
818                                                 val_slot == TOMBSTONE)
819                                                 chm->_size.fetch_add(-1, memory_order_relaxed);
820                                 }
821                                 return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
822                         }
823                         // Else CAS failed
824                         V = val(kvs, idx);
825                         if (is_prime(V))
826                                 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
827                                         idx, expVal), key_slot, val_slot, expVal);
828                 }
829         }
830
831         // Help along an existing table-resize. This is a fast cut-out wrapper.
832         kvs_data* help_copy(kvs_data *helper) {
833                 kvs_data *topkvs = _kvs.load(memory_order_acquire);
834                 CHM *topchm = get_chm(topkvs);
835                 // No cpy in progress
836                 if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
837                 topchm->help_copy_impl(this, topkvs, false);
838                 return helper;
839         }
840 };
841
842 #endif